package org.apache.hadoop.hbase.regionserver.regionreplication;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.util.StringUtils;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
/* loaded from: input_file:org/apache/hadoop/hbase/regionserver/regionreplication/RegionReplicationBufferManager.class */
public class RegionReplicationBufferManager {
    private static final Logger LOG = LoggerFactory.getLogger(RegionReplicationBufferManager.class);
    public static final String MAX_PENDING_SIZE = "hbase.region.read-replica.sink.max-pending-size";
    public static final long MAX_PENDING_SIZE_DEFAULT = 104857600;
    public static final String SOFT_LIMIT_PERCENTAGE = "hbase.region.read-replica.sink.max-pending-size.soft-limit-percentage";
    public static final float SOFT_LIMIT_PERCENTAGE_DEFAULT = 0.8f;
    private final RegionServerServices rsServices;
    private final long maxPendingSize;
    private final long softMaxPendingSize;
    private final AtomicLong pendingSize = new AtomicLong();
    private final ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 1, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Region-Replication-Flusher-%d").build(), (runnable, threadPoolExecutor) -> {
        LOG.debug("A flush task is ongoing, drop the new scheduled one");
    });

    public RegionReplicationBufferManager(RegionServerServices regionServerServices) {
        this.rsServices = regionServerServices;
        this.maxPendingSize = regionServerServices.getConfiguration().getLong(MAX_PENDING_SIZE, MAX_PENDING_SIZE_DEFAULT);
        this.softMaxPendingSize = r0.getFloat(SOFT_LIMIT_PERCENTAGE, 0.8f) * ((float) this.maxPendingSize);
        this.executor.allowCoreThreadTimeOut(true);
    }

    private void flush() {
        long j = Long.MIN_VALUE;
        HRegion hRegion = null;
        for (HRegion hRegion2 : this.rsServices.getRegions()) {
            Optional<RegionReplicationSink> regionReplicationSink = hRegion2.getRegionReplicationSink();
            if (regionReplicationSink.isPresent()) {
                long pendingSize = regionReplicationSink.get().pendingSize();
                if (pendingSize > j) {
                    j = pendingSize;
                    hRegion = hRegion2;
                }
            }
        }
        if (hRegion == null) {
            LOG.warn("Can not find a region to flush");
            return;
        }
        try {
            LOG.info("Going to flush {} with {} pending entry size", hRegion.getRegionInfo(), StringUtils.TraditionalBinaryPrefix.long2String(j, MobConstants.EMPTY_STRING, 1));
            HRegion.FlushResultImpl flushcache = hRegion.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
            if (!flushcache.isFlushSucceeded()) {
                LOG.warn("Failed to flush {}, the result is {}", hRegion.getRegionInfo(), flushcache.getResult());
            }
        } catch (IOException e) {
            LOG.warn("Failed to flush {}", hRegion.getRegionInfo(), e);
        }
    }

    public boolean increase(long j) {
        long addAndGet = this.pendingSize.addAndGet(j);
        if (addAndGet > this.softMaxPendingSize) {
            this.executor.execute(this::flush);
        }
        return addAndGet <= this.maxPendingSize;
    }

    public void decrease(long j) {
        this.pendingSize.addAndGet(-j);
    }

    public void stop() {
        this.executor.shutdown();
    }
}
