package org.rx.io;

import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap;
import lombok.NonNull;
import org.rx.bean.IntWaterMark;
import org.rx.bean.Tuple;
import org.rx.core.Disposable;
import org.rx.core.Extends;
import org.rx.core.ManualResetEvent;
import org.rx.core.Tasks;
import org.rx.core.TimeoutFlag;
import org.rx.util.function.BiAction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/rx/io/WriteBehindQueue.class */
public final class WriteBehindQueue<K, V> extends Disposable {
    private static final Logger log = LoggerFactory.getLogger(WriteBehindQueue.class);
    final long writeDelayed;
    final IntWaterMark funcWaterMark;
    final ConcurrentSkipListMap<K, Tuple<V, BiAction<V>>> funcs;
    final ManualResetEvent syncRoot;

    /* JADX INFO: Access modifiers changed from: package-private */
    public WriteBehindQueue(long j, int i) {
        this(j, new IntWaterMark((int) Math.ceil(i / 2.0d), i));
    }

    WriteBehindQueue(long j, @NonNull IntWaterMark intWaterMark) {
        this.funcs = new ConcurrentSkipListMap<>();
        this.syncRoot = new ManualResetEvent();
        if (intWaterMark == null) {
            throw new NullPointerException("funcWaterMark is marked non-null but is null");
        }
        this.writeDelayed = j;
        this.funcWaterMark = intWaterMark;
    }

    @Override // org.rx.core.Disposable
    protected void freeObjects() {
        consume();
    }

    public void reset() {
        this.funcs.clear();
        this.syncRoot.set();
    }

    public void offer(@NonNull K k, V v, BiAction<V> biAction) {
        if (k == null) {
            throw new NullPointerException("posKey is marked non-null but is null");
        }
        if (isClosed()) {
            biAction.invoke(v);
            return;
        }
        this.funcs.put(k, Tuple.of(v, biAction));
        if (this.funcs.size() > this.funcWaterMark.getHigh()) {
            log.warn("high water mark threshold");
            Tasks.timer().setTimeout(this::consume, j -> {
                if (j == 0) {
                    return 1L;
                }
                return this.writeDelayed;
            }, this, TimeoutFlag.SINGLE);
            this.syncRoot.waitOne();
            this.syncRoot.reset();
            log.info("below low water mark");
        }
        Tasks.setTimeout(this::consume, this.writeDelayed, this, TimeoutFlag.SINGLE);
        log.debug("offer {} delay={}", k, Long.valueOf(this.writeDelayed));
    }

    public boolean remove(@NonNull K k) {
        if (k == null) {
            throw new NullPointerException("posKey is marked non-null but is null");
        }
        return this.funcs.remove(k) != null;
    }

    public V peek(@NonNull K k) {
        if (k == null) {
            throw new NullPointerException("posKey is marked non-null but is null");
        }
        Tuple<V, BiAction<V>> tuple = this.funcs.get(k);
        if (tuple == null) {
            return null;
        }
        return tuple.left;
    }

    public synchronized void consume() {
        for (int size = this.funcs.size(); size > 0; size--) {
            Map.Entry<K, Tuple<V, BiAction<V>>> pollFirstEntry = this.funcs.pollFirstEntry();
            if (this.funcs.size() <= this.funcWaterMark.getLow()) {
                log.debug("low water mark threshold");
                this.syncRoot.set();
            }
            if (pollFirstEntry == null) {
                return;
            }
            Tuple<V, BiAction<V>> value = pollFirstEntry.getValue();
            Extends.quietly(() -> {
                ((BiAction) value.right).invoke(value.left);
            });
            log.debug("consume {}", pollFirstEntry.getKey());
        }
    }

    public long getWriteDelayed() {
        return this.writeDelayed;
    }

    public IntWaterMark getFuncWaterMark() {
        return this.funcWaterMark;
    }
}
