package io.pravega.segmentstore.server.host.stat;

import com.google.common.annotations.VisibleForTesting;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.common.Exceptions;
import io.pravega.common.ObjectClosedException;
import io.pravega.common.concurrent.Futures;
import io.pravega.common.util.BlockingDrainingQueue;
import io.pravega.common.util.SimpleCache;
import io.pravega.segmentstore.contracts.Attributes;
import io.pravega.segmentstore.contracts.StreamSegmentStore;
import io.pravega.shared.MetricsNames;
import io.pravega.shared.MetricsTags;
import io.pravega.shared.NameUtils;
import io.pravega.shared.metrics.Counter;
import io.pravega.shared.metrics.DynamicLogger;
import io.pravega.shared.metrics.MetricsProvider;
import io.pravega.shared.metrics.OpStatsLogger;
import io.pravega.shared.metrics.StatsLogger;
import io.pravega.shared.segment.ScaleType;
import java.beans.ConstructorProperties;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import lombok.Generated;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/pravega/segmentstore/server/host/stat/SegmentStatsRecorderImpl.class */
public class SegmentStatsRecorderImpl implements SegmentStatsRecorder {
    private static final int MAX_CACHE_SIZE = 100000;
    private static final int MAX_APPEND_QUEUE_PROCESS_BATCH_SIZE = 10000;
    private final Counter globalSegmentWriteBytes;
    private final Counter globalSegmentWriteEvents;
    private final Counter globalSegmentReadBytes;
    private final OpStatsLogger createStreamSegment;
    private final OpStatsLogger readStreamSegment;
    private final OpStatsLogger writeStreamSegment;
    private final OpStatsLogger appendSizeDistribution;
    private final OpStatsLogger readSizeDistribution;
    private final DynamicLogger dynamicLogger;
    private final Set<String> pendingCacheLoads;
    private final SimpleCache<String, SegmentWriteContext> cache;
    private final Duration reportingDuration;
    private final AutoScaleProcessor reporter;
    private final StreamSegmentStore store;
    private final ScheduledFuture<?> cacheCleanup;
    private final ScheduledExecutorService executor;
    private final BlockingDrainingQueue<AppendInfo> appendQueue;

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log = LoggerFactory.getLogger(SegmentStatsRecorderImpl.class);
    private static final Duration DEFAULT_REPORTING_DURATION = Duration.ofMinutes(2);
    private static final Duration DEFAULT_EXPIRY_DURATION = Duration.ofMinutes(20);
    private static final Duration CACHE_CLEANUP_INTERVAL = Duration.ofMinutes(2);
    private static final Duration TIMEOUT = Duration.ofMinutes(1);
    private static final StatsLogger STATS_LOGGER = MetricsProvider.createStatsLogger("segmentstore");

    /* loaded from: input_file:io/pravega/segmentstore/server/host/stat/SegmentStatsRecorderImpl$AppendInfo.class */
    private static class AppendInfo {
        final String streamSegmentName;
        final long dataLength;
        final int numOfEvents;
        final Duration elapsed;

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        @ConstructorProperties({"streamSegmentName", "dataLength", "numOfEvents", "elapsed"})
        public AppendInfo(String str, long j, int i, Duration duration) {
            this.streamSegmentName = str;
            this.dataLength = j;
            this.numOfEvents = i;
            this.elapsed = duration;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public String getStreamSegmentName() {
            return this.streamSegmentName;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public long getDataLength() {
            return this.dataLength;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public int getNumOfEvents() {
            return this.numOfEvents;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public Duration getElapsed() {
            return this.elapsed;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof AppendInfo)) {
                return false;
            }
            AppendInfo appendInfo = (AppendInfo) obj;
            if (!appendInfo.canEqual(this)) {
                return false;
            }
            String streamSegmentName = getStreamSegmentName();
            String streamSegmentName2 = appendInfo.getStreamSegmentName();
            if (streamSegmentName == null) {
                if (streamSegmentName2 != null) {
                    return false;
                }
            } else if (!streamSegmentName.equals(streamSegmentName2)) {
                return false;
            }
            if (getDataLength() != appendInfo.getDataLength() || getNumOfEvents() != appendInfo.getNumOfEvents()) {
                return false;
            }
            Duration elapsed = getElapsed();
            Duration elapsed2 = appendInfo.getElapsed();
            return elapsed == null ? elapsed2 == null : elapsed.equals(elapsed2);
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        protected boolean canEqual(Object obj) {
            return obj instanceof AppendInfo;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public int hashCode() {
            String streamSegmentName = getStreamSegmentName();
            int hashCode = (1 * 59) + (streamSegmentName == null ? 43 : streamSegmentName.hashCode());
            long dataLength = getDataLength();
            int numOfEvents = (((hashCode * 59) + ((int) ((dataLength >>> 32) ^ dataLength))) * 59) + getNumOfEvents();
            Duration elapsed = getElapsed();
            return (numOfEvents * 59) + (elapsed == null ? 43 : elapsed.hashCode());
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public String toString() {
            String streamSegmentName = getStreamSegmentName();
            long dataLength = getDataLength();
            int numOfEvents = getNumOfEvents();
            getElapsed();
            return "SegmentStatsRecorderImpl.AppendInfo(streamSegmentName=" + streamSegmentName + ", dataLength=" + dataLength + ", numOfEvents=" + streamSegmentName + ", elapsed=" + numOfEvents + ")";
        }
    }

    /* loaded from: input_file:io/pravega/segmentstore/server/host/stat/SegmentStatsRecorderImpl$SegmentWrite.class */
    private static class SegmentWrite {
        long bytes = 0;
        int events = 0;

        private SegmentWrite() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/pravega/segmentstore/server/host/stat/SegmentStatsRecorderImpl$SegmentWriteContext.class */
    public static class SegmentWriteContext implements AutoCloseable {
        final Counter writeBytes;
        final Counter writeEvents;
        private volatile SegmentAggregates segmentAggregates;

        SegmentWriteContext(String str, SegmentAggregates segmentAggregates) {
            this.segmentAggregates = segmentAggregates;
            String[] segmentTags = MetricsTags.segmentTags(str);
            this.writeBytes = SegmentStatsRecorderImpl.STATS_LOGGER.createCounter("pravega.segmentstore.segment.write_bytes", segmentTags);
            this.writeEvents = SegmentStatsRecorderImpl.STATS_LOGGER.createCounter("pravega.segmentstore.segment.write_events", segmentTags);
        }

        void recordWrite(long j, int i) {
            this.writeBytes.add(j);
            this.writeEvents.add(i);
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            this.writeBytes.close();
            this.writeEvents.close();
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public SegmentAggregates getSegmentAggregates() {
            return this.segmentAggregates;
        }

        @SuppressFBWarnings(justification = "generated code")
        @Generated
        public void setSegmentAggregates(SegmentAggregates segmentAggregates) {
            this.segmentAggregates = segmentAggregates;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SegmentStatsRecorderImpl(AutoScaleProcessor autoScaleProcessor, StreamSegmentStore streamSegmentStore, ScheduledExecutorService scheduledExecutorService) {
        this(autoScaleProcessor, streamSegmentStore, DEFAULT_REPORTING_DURATION, DEFAULT_EXPIRY_DURATION, scheduledExecutorService);
    }

    @VisibleForTesting
    SegmentStatsRecorderImpl(@NonNull AutoScaleProcessor autoScaleProcessor, @NonNull StreamSegmentStore streamSegmentStore, @NonNull Duration duration, @NonNull Duration duration2, @NonNull ScheduledExecutorService scheduledExecutorService) {
        this.globalSegmentWriteBytes = STATS_LOGGER.createCounter(MetricsNames.globalMetricName("pravega.segmentstore.segment.write_bytes"), new String[0]);
        this.globalSegmentWriteEvents = STATS_LOGGER.createCounter(MetricsNames.globalMetricName("pravega.segmentstore.segment.write_events"), new String[0]);
        this.globalSegmentReadBytes = STATS_LOGGER.createCounter(MetricsNames.globalMetricName("pravega.segmentstore.segment.read_bytes"), new String[0]);
        this.createStreamSegment = STATS_LOGGER.createStats("pravega.segmentstore.segment.create_latency_ms", new String[0]);
        this.readStreamSegment = STATS_LOGGER.createStats("pravega.segmentstore.segment.read_latency_ms", new String[0]);
        this.writeStreamSegment = STATS_LOGGER.createStats("pravega.segmentstore.segment.write_latency_ms", new String[0]);
        this.appendSizeDistribution = STATS_LOGGER.createStats("pravega.segmentstore.segment.append_size", new String[0]);
        this.readSizeDistribution = STATS_LOGGER.createStats("pravega.segmentstore.segment.read_size", new String[0]);
        this.dynamicLogger = MetricsProvider.getDynamicLogger();
        if (autoScaleProcessor == null) {
            throw new NullPointerException("reporter is marked non-null but is null");
        }
        if (streamSegmentStore == null) {
            throw new NullPointerException("store is marked non-null but is null");
        }
        if (duration == null) {
            throw new NullPointerException("reportingDuration is marked non-null but is null");
        }
        if (duration2 == null) {
            throw new NullPointerException("expiryDuration is marked non-null but is null");
        }
        if (scheduledExecutorService == null) {
            throw new NullPointerException("executor is marked non-null but is null");
        }
        this.executor = scheduledExecutorService;
        this.pendingCacheLoads = Collections.synchronizedSet(new HashSet());
        this.cache = new SimpleCache<>(MAX_CACHE_SIZE, duration2, (str, segmentWriteContext) -> {
            segmentWriteContext.close();
        });
        SimpleCache<String, SegmentWriteContext> simpleCache = this.cache;
        Objects.requireNonNull(simpleCache);
        this.cacheCleanup = scheduledExecutorService.scheduleAtFixedRate(simpleCache::cleanUp, CACHE_CLEANUP_INTERVAL.toMillis(), 2L, TimeUnit.MINUTES);
        this.reportingDuration = duration;
        this.store = streamSegmentStore;
        this.reporter = autoScaleProcessor;
        this.appendQueue = new BlockingDrainingQueue<>();
        startAppendQueueProcessor();
    }

    private void startAppendQueueProcessor() {
        Futures.loop(() -> {
            return true;
        }, () -> {
            return this.appendQueue.take(MAX_APPEND_QUEUE_PROCESS_BATCH_SIZE).thenAcceptAsync(this::processAppendInfo, (Executor) this.executor);
        }, this.executor).exceptionally(th -> {
            Throwable unwrap = Exceptions.unwrap(th);
            if ((unwrap instanceof ObjectClosedException) || (unwrap instanceof CancellationException)) {
                return null;
            }
            log.error("SegmentStatsRecorder append queue processor failed. ", unwrap);
            return null;
        });
    }

    @Override // io.pravega.segmentstore.server.host.stat.SegmentStatsRecorder, java.lang.AutoCloseable
    public void close() {
        this.appendQueue.close();
        this.cacheCleanup.cancel(true);
        this.createStreamSegment.close();
        this.readStreamSegment.close();
        this.writeStreamSegment.close();
        this.appendSizeDistribution.close();
        this.readSizeDistribution.close();
        this.globalSegmentWriteEvents.close();
        this.globalSegmentWriteBytes.close();
        this.globalSegmentReadBytes.close();
    }

    private SegmentWriteContext getWriteContext(String str) {
        SegmentWriteContext segmentWriteContext = (SegmentWriteContext) this.cache.get(str);
        if (segmentWriteContext == null && !NameUtils.isTransactionSegment(str)) {
            loadAsynchronously(str);
        }
        return segmentWriteContext;
    }

    @VisibleForTesting
    protected CompletableFuture<Void> loadAsynchronously(String str) {
        if (this.store != null && this.pendingCacheLoads.add(str)) {
            return this.store.getStreamSegmentInfo(str, TIMEOUT).thenAcceptAsync(segmentProperties -> {
                long longValue = ((Long) segmentProperties.getAttributes().getOrDefault(Attributes.SCALE_POLICY_TYPE, Long.MIN_VALUE)).longValue();
                long longValue2 = ((Long) segmentProperties.getAttributes().getOrDefault(Attributes.SCALE_POLICY_RATE, Long.MIN_VALUE)).longValue();
                if (longValue >= 0 && longValue2 >= 0) {
                    this.cache.put(str, new SegmentWriteContext(str, SegmentAggregates.forPolicy(ScaleType.fromValue((byte) longValue), (int) longValue2)));
                }
                this.pendingCacheLoads.remove(str);
            }, (Executor) this.executor);
        }
        return CompletableFuture.completedFuture(null);
    }

    @Override // io.pravega.segmentstore.server.host.stat.SegmentStatsRecorder
    public void createSegment(String str, byte b, int i, Duration duration) {
        this.createStreamSegment.reportSuccessEvent(duration);
        SegmentAggregates forPolicy = SegmentAggregates.forPolicy(ScaleType.fromValue(b), i);
        if (!NameUtils.isTransactionSegment(str)) {
            this.cache.put(str, new SegmentWriteContext(str, forPolicy));
        }
        if (forPolicy.isScalingEnabled()) {
            this.reporter.notifyCreated(str);
        }
    }

    @Override // io.pravega.segmentstore.server.host.stat.SegmentStatsRecorder
    public void deleteSegment(String str) {
        if (NameUtils.isTransactionSegment(str)) {
            return;
        }
        segmentClosedForWrites(str);
        this.dynamicLogger.freezeCounter("pravega.segmentstore.segment.read_bytes", MetricsTags.segmentTags(str));
    }

    @Override // io.pravega.segmentstore.server.host.stat.SegmentStatsRecorder
    public void sealSegment(String str) {
        if (!NameUtils.isTransactionSegment(str)) {
            segmentClosedForWrites(str);
        }
        if (this.cache.remove(str) != null) {
            this.reporter.notifySealed(str);
        }
    }

    private void segmentClosedForWrites(String str) {
        SegmentWriteContext segmentWriteContext = (SegmentWriteContext) this.cache.remove(str);
        if (segmentWriteContext != null) {
            segmentWriteContext.close();
        }
    }

    @Override // io.pravega.segmentstore.server.host.stat.SegmentStatsRecorder
    public void policyUpdate(String str, byte b, int i) {
        SegmentWriteContext writeContext = getWriteContext(str);
        if (writeContext != null) {
            SegmentAggregates segmentAggregates = writeContext.getSegmentAggregates();
            if (segmentAggregates.getScaleType().getValue() != b) {
                writeContext.setSegmentAggregates(SegmentAggregates.forPolicy(ScaleType.fromValue(b), i));
            } else {
                segmentAggregates.setTargetRate(i);
            }
        }
    }

    @Override // io.pravega.segmentstore.server.host.stat.SegmentStatsRecorder
    public void recordAppend(String str, long j, int i, Duration duration) {
        this.appendQueue.add(new AppendInfo(str, j, i, duration));
    }

    private void processAppendInfo(Queue<AppendInfo> queue) {
        HashMap hashMap = new HashMap();
        long j = 0;
        int i = 0;
        while (!queue.isEmpty()) {
            try {
                AppendInfo poll = queue.poll();
                j += poll.getDataLength();
                i += poll.getNumOfEvents();
                this.writeStreamSegment.reportSuccessEvent(poll.getElapsed());
                this.appendSizeDistribution.reportSuccessValue(poll.getDataLength());
                if (!NameUtils.isTransactionSegment(poll.getStreamSegmentName())) {
                    SegmentWrite segmentWrite = (SegmentWrite) hashMap.getOrDefault(poll.getStreamSegmentName(), null);
                    if (segmentWrite == null) {
                        segmentWrite = new SegmentWrite();
                        hashMap.put(poll.getStreamSegmentName(), segmentWrite);
                    }
                    segmentWrite.bytes += poll.getDataLength();
                    segmentWrite.events += poll.getNumOfEvents();
                }
            } catch (Exception e) {
                log.warn("Record statistic failed", e);
                return;
            }
        }
        this.globalSegmentWriteBytes.add(j);
        this.globalSegmentWriteEvents.add(i);
        for (Map.Entry entry : hashMap.entrySet()) {
            String str = (String) entry.getKey();
            SegmentWrite segmentWrite2 = (SegmentWrite) entry.getValue();
            SegmentWriteContext writeContext = getWriteContext((String) entry.getKey());
            if (writeContext != null) {
                writeContext.recordWrite(segmentWrite2.bytes, segmentWrite2.events);
                SegmentAggregates segmentAggregates = writeContext.getSegmentAggregates();
                if (segmentAggregates.update(segmentWrite2.bytes, segmentWrite2.events)) {
                    reportIfNeeded(str, segmentAggregates);
                }
            }
        }
    }

    @Override // io.pravega.segmentstore.server.host.stat.SegmentStatsRecorder
    public void merge(String str, long j, int i, long j2) {
        SegmentWriteContext writeContext = getWriteContext(str);
        if (writeContext != null) {
            writeContext.recordWrite(j, i);
            SegmentAggregates segmentAggregates = writeContext.getSegmentAggregates();
            if (segmentAggregates.updateTx(j, i, j2)) {
                reportIfNeededAsync(str, segmentAggregates);
            }
        }
    }

    @Override // io.pravega.segmentstore.server.host.stat.SegmentStatsRecorder
    public void readComplete(Duration duration) {
        this.readStreamSegment.reportSuccessEvent(duration);
    }

    @Override // io.pravega.segmentstore.server.host.stat.SegmentStatsRecorder
    public void read(String str, int i) {
        this.globalSegmentReadBytes.add(i);
        this.dynamicLogger.incCounterValue("pravega.segmentstore.segment.read_bytes", i, MetricsTags.segmentTags(str));
        this.readSizeDistribution.reportSuccessValue(i);
    }

    private void reportIfNeededAsync(String str, SegmentAggregates segmentAggregates) {
        if (segmentAggregates.reportIfNeeded(this.reportingDuration)) {
            this.executor.execute(() -> {
                report(str, segmentAggregates);
            });
        }
    }

    private void reportIfNeeded(String str, SegmentAggregates segmentAggregates) {
        if (segmentAggregates.reportIfNeeded(this.reportingDuration)) {
            report(str, segmentAggregates);
        }
    }

    private void report(String str, SegmentAggregates segmentAggregates) {
        try {
            this.reporter.report(str, segmentAggregates.getTargetRate(), segmentAggregates.getStartTime(), segmentAggregates.getTwoMinuteRate(), segmentAggregates.getFiveMinuteRate(), segmentAggregates.getTenMinuteRate(), segmentAggregates.getTwentyMinuteRate());
        } catch (Exception e) {
            log.error("Unable to report Segment Aggregates for '{}'.", str, e);
        }
    }

    @VisibleForTesting
    SegmentAggregates getSegmentAggregates(String str) {
        SegmentWriteContext segmentWriteContext = (SegmentWriteContext) this.cache.get(str);
        if (segmentWriteContext == null) {
            return null;
        }
        return segmentWriteContext.getSegmentAggregates();
    }
}
