package io.pravega.segmentstore.server.host.stat;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalCause;
import com.google.common.cache.RemovalListeners;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.ClientFactory;
import io.pravega.client.segment.impl.Segment;
import io.pravega.client.stream.EventStreamWriter;
import io.pravega.client.stream.EventWriterConfig;
import io.pravega.client.stream.Serializer;
import io.pravega.client.stream.impl.JavaSerializer;
import io.pravega.common.util.Retry;
import io.pravega.shared.controller.event.AutoScaleEvent;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/segmentstore/server/host/stat/AutoScaleProcessor.class */
public class AutoScaleProcessor {

    @SuppressFBWarnings(justification = "generated code")
    private static final Logger log = LoggerFactory.getLogger(AutoScaleProcessor.class);
    private static final long TWO_MINUTES = Duration.ofMinutes(2).toMillis();
    private static final long FIVE_MINUTES = Duration.ofMinutes(5).toMillis();
    private static final long TEN_MINUTES = Duration.ofMinutes(10).toMillis();
    private static final long TWENTY_MINUTES = Duration.ofMinutes(20).toMillis();
    private static final int MAX_CACHE_SIZE = 1000000;
    private static final int INITIAL_CAPACITY = 1000;
    private final AtomicReference<ClientFactory> clientFactory;
    private final AtomicBoolean initialized;
    private final Cache<String, Pair<Long, Long>> cache;
    private final Serializer<AutoScaleEvent> serializer;
    private final AtomicReference<EventStreamWriter<AutoScaleEvent>> writer;
    private final EventWriterConfig writerConfig;
    private final AutoScalerConfig configuration;
    private final ScheduledExecutorService maintenanceExecutor;

    /* JADX INFO: Access modifiers changed from: package-private */
    public AutoScaleProcessor(AutoScalerConfig autoScalerConfig, ScheduledExecutorService scheduledExecutorService) {
        this.clientFactory = new AtomicReference<>();
        this.initialized = new AtomicBoolean(false);
        this.configuration = autoScalerConfig;
        this.maintenanceExecutor = scheduledExecutorService;
        this.serializer = new JavaSerializer();
        this.writerConfig = EventWriterConfig.builder().build();
        this.writer = new AtomicReference<>();
        this.cache = CacheBuilder.newBuilder().initialCapacity(INITIAL_CAPACITY).maximumSize(1000000L).expireAfterAccess(autoScalerConfig.getCacheExpiry().getSeconds(), TimeUnit.SECONDS).removalListener(RemovalListeners.asynchronous(removalNotification -> {
            if (removalNotification.getCause().equals(RemovalCause.EXPIRED)) {
                triggerScaleDown((String) removalNotification.getKey(), true);
            }
        }, scheduledExecutorService)).build();
        CompletableFuture.runAsync(this::bootstrapRequestWriters, scheduledExecutorService);
    }

    @VisibleForTesting
    AutoScaleProcessor(EventStreamWriter<AutoScaleEvent> eventStreamWriter, AutoScalerConfig autoScalerConfig, ScheduledExecutorService scheduledExecutorService) {
        this(autoScalerConfig, scheduledExecutorService);
        this.writer.set(eventStreamWriter);
        this.initialized.set(true);
        Cache<String, Pair<Long, Long>> cache = this.cache;
        cache.getClass();
        scheduledExecutorService.scheduleAtFixedRate(cache::cleanUp, 0L, autoScalerConfig.getCacheCleanup().getSeconds(), TimeUnit.SECONDS);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public AutoScaleProcessor(AutoScalerConfig autoScalerConfig, ClientFactory clientFactory, ScheduledExecutorService scheduledExecutorService) {
        this(autoScalerConfig, scheduledExecutorService);
        this.clientFactory.set(clientFactory);
    }

    private void bootstrapRequestWriters() {
        CompletableFuture completableFuture = new CompletableFuture();
        this.maintenanceExecutor.schedule(() -> {
            return Retry.indefinitelyWithExpBackoff(100L, 10, 10000L, th -> {
                log.warn("error while creating writer for requeststream");
                log.debug("error while creating writer for requeststream {}", th);
            }).runAsync(() -> {
                if (this.clientFactory.get() == null) {
                    this.clientFactory.compareAndSet(null, ClientFactory.withScope("_system", this.configuration.getControllerUri()));
                }
                this.writer.set(this.clientFactory.get().createEventWriter(this.configuration.getInternalRequestStream(), this.serializer, this.writerConfig));
                this.initialized.set(true);
                ScheduledExecutorService scheduledExecutorService = this.maintenanceExecutor;
                Cache<String, Pair<Long, Long>> cache = this.cache;
                cache.getClass();
                scheduledExecutorService.scheduleAtFixedRate(cache::cleanUp, 0L, this.configuration.getCacheCleanup().getSeconds(), TimeUnit.SECONDS);
                log.info("bootstrapping auto-scale reporter done");
                completableFuture.complete(null);
                return completableFuture;
            }, this.maintenanceExecutor);
        }, 10L, TimeUnit.SECONDS);
    }

    private void triggerScaleUp(String str, int i) {
        if (this.initialized.get()) {
            Pair pair = (Pair) this.cache.getIfPresent(str);
            long j = 0;
            if (pair != null && pair.getKey() != null) {
                j = ((Long) pair.getKey()).longValue();
            }
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis - j > this.configuration.getMuteDuration().toMillis()) {
                log.info("sending request for scale up for {}", str);
                Segment fromScopedName = Segment.fromScopedName(str);
                writeRequest(new AutoScaleEvent(fromScopedName.getScope(), fromScopedName.getStreamName(), fromScopedName.getSegmentNumber(), (byte) 0, currentTimeMillis, i, false)).thenAccept(r12 -> {
                    this.cache.put(str, new ImmutablePair(Long.valueOf(currentTimeMillis), Long.valueOf(currentTimeMillis)));
                });
            }
        }
    }

    private void triggerScaleDown(String str, boolean z) {
        if (this.initialized.get()) {
            Pair pair = (Pair) this.cache.getIfPresent(str);
            long j = 0;
            if (pair != null && pair.getValue() != null) {
                j = ((Long) pair.getValue()).longValue();
            }
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis - j > this.configuration.getMuteDuration().toMillis()) {
                log.info("sending request for scale down for {}", str);
                Segment fromScopedName = Segment.fromScopedName(str);
                writeRequest(new AutoScaleEvent(fromScopedName.getScope(), fromScopedName.getStreamName(), fromScopedName.getSegmentNumber(), (byte) 1, currentTimeMillis, 0, z)).thenAccept(r13 -> {
                    if (z) {
                        return;
                    }
                    this.cache.put(str, new ImmutablePair(0L, Long.valueOf(currentTimeMillis)));
                });
            }
        }
    }

    private CompletableFuture<Void> writeRequest(AutoScaleEvent autoScaleEvent) {
        return this.writer.get().writeEvent(autoScaleEvent.getKey(), autoScaleEvent).whenComplete((r4, th) -> {
            if (th != null) {
                log.error("error sending request to requeststream {}", th);
            } else {
                log.debug("scale event posted successfully");
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void report(String str, long j, byte b, long j2, double d, double d2, double d3, double d4) {
        log.info("received traffic for {} with twoMinute rate = {} and targetRate = {}", new Object[]{str, Double.valueOf(d), Long.valueOf(j)});
        if (!this.initialized.get() || b == 0) {
            return;
        }
        long currentTimeMillis = System.currentTimeMillis();
        if (currentTimeMillis - j2 > this.configuration.getCooldownDuration().toMillis()) {
            log.debug("cool down period elapsed for {}", str);
            if ((d > 5.0d * j && currentTimeMillis - j2 > TWO_MINUTES) || ((d2 > 2.0d * j && currentTimeMillis - j2 > FIVE_MINUTES) || (d3 > j && currentTimeMillis - j2 > TEN_MINUTES))) {
                int max = Math.max(2, (int) (Double.max(Double.max(d, d2), d3) / j));
                log.debug("triggering scale up for {} with number of splits {}", str, Integer.valueOf(max));
                triggerScaleUp(str, max);
            }
            if (d >= j || d2 >= j || d3 >= j || d4 >= j / 2.0d || currentTimeMillis - j2 <= TWENTY_MINUTES) {
                return;
            }
            log.debug("triggering scale down for {}", str);
            triggerScaleDown(str, false);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void notifyCreated(String str, byte b, long j) {
        if (b != 0) {
            this.cache.put(str, new ImmutablePair(Long.valueOf(System.currentTimeMillis()), Long.valueOf(System.currentTimeMillis())));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void notifySealed(String str) {
        this.cache.invalidate(str);
    }

    @VisibleForTesting
    void put(String str, ImmutablePair<Long, Long> immutablePair) {
        this.cache.put(str, immutablePair);
    }

    @VisibleForTesting
    Pair<Long, Long> get(String str) {
        return (Pair) this.cache.getIfPresent(str);
    }
}
