package org.logstash.execution;

import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.jruby.Ruby;
import org.jruby.RubyClass;
import org.jruby.RubyHash;
import org.jruby.RubyNumeric;
import org.jruby.RubyObject;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.java.proxies.JavaProxy;
import org.jruby.javasupport.JavaObject;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
import org.logstash.instrument.metrics.MetricKeys;
import org.logstash.instrument.metrics.counter.LongCounter;

@JRubyClass(name = {"QueueReadClientBase"})
/* loaded from: input_file:org/logstash/execution/QueueReadClientBase.class */
public abstract class QueueReadClientBase extends RubyObject implements QueueReadClient {
    private static final long serialVersionUID = 1;
    protected int batchSize;
    protected long waitForNanos;
    protected long waitForMillis;
    private final ConcurrentHashMap<Long, QueueBatch> inflightBatches;
    private final ConcurrentHashMap<Long, Long> inflightClocks;
    private LongCounter eventMetricOut;
    private LongCounter eventMetricFiltered;
    private LongCounter eventMetricTime;
    private LongCounter pipelineMetricOut;
    private LongCounter pipelineMetricFiltered;
    private LongCounter pipelineMetricTime;

    /* JADX INFO: Access modifiers changed from: protected */
    public QueueReadClientBase(Ruby ruby, RubyClass rubyClass) {
        super(ruby, rubyClass);
        this.batchSize = 125;
        this.waitForNanos = 50000000L;
        this.waitForMillis = 50L;
        this.inflightBatches = new ConcurrentHashMap<>();
        this.inflightClocks = new ConcurrentHashMap<>();
    }

    @JRubyMethod(name = {"inflight_batches"})
    public RubyHash rubyGetInflightBatches(ThreadContext threadContext) {
        RubyHash newHash = RubyHash.newHash(threadContext.runtime);
        newHash.putAll(this.inflightBatches);
        return newHash;
    }

    @JRubyMethod(name = {"set_events_metric"})
    public IRubyObject setEventsMetric(IRubyObject iRubyObject) {
        AbstractNamespacedMetricExt abstractNamespacedMetricExt = (AbstractNamespacedMetricExt) iRubyObject;
        synchronized (abstractNamespacedMetricExt.getMetric()) {
            this.eventMetricOut = LongCounter.fromRubyBase(abstractNamespacedMetricExt, MetricKeys.OUT_KEY);
            this.eventMetricFiltered = LongCounter.fromRubyBase(abstractNamespacedMetricExt, MetricKeys.FILTERED_KEY);
            this.eventMetricTime = LongCounter.fromRubyBase(abstractNamespacedMetricExt, MetricKeys.DURATION_IN_MILLIS_KEY);
        }
        return this;
    }

    @JRubyMethod(name = {"set_pipeline_metric"})
    public IRubyObject setPipelineMetric(IRubyObject iRubyObject) {
        AbstractNamespacedMetricExt abstractNamespacedMetricExt = (AbstractNamespacedMetricExt) iRubyObject;
        synchronized (abstractNamespacedMetricExt.getMetric()) {
            this.pipelineMetricOut = LongCounter.fromRubyBase(abstractNamespacedMetricExt, MetricKeys.OUT_KEY);
            this.pipelineMetricFiltered = LongCounter.fromRubyBase(abstractNamespacedMetricExt, MetricKeys.FILTERED_KEY);
            this.pipelineMetricTime = LongCounter.fromRubyBase(abstractNamespacedMetricExt, MetricKeys.DURATION_IN_MILLIS_KEY);
        }
        return this;
    }

    @JRubyMethod(name = {"set_batch_dimensions"})
    public IRubyObject rubySetBatchDimensions(IRubyObject iRubyObject, IRubyObject iRubyObject2) {
        setBatchDimensions(((RubyNumeric) iRubyObject).getIntValue(), ((RubyNumeric) iRubyObject2).getIntValue());
        return this;
    }

    public void setBatchDimensions(int i, int i2) {
        this.batchSize = i;
        this.waitForNanos = TimeUnit.NANOSECONDS.convert(i2, TimeUnit.MILLISECONDS);
        this.waitForMillis = i2;
    }

    @JRubyMethod(name = {"empty?"})
    public IRubyObject rubyIsEmpty(ThreadContext threadContext) {
        return threadContext.runtime.newBoolean(isEmpty());
    }

    @JRubyMethod(name = {"close"})
    public void rubyClose(ThreadContext threadContext) {
        try {
            close();
        } catch (IOException e) {
            throw RubyUtil.newRubyIOError(threadContext.runtime, e);
        }
    }

    @JRubyMethod(name = {"read_batch"})
    public IRubyObject rubyReadBatch(ThreadContext threadContext) throws InterruptedException {
        return JavaObject.wrap(threadContext.runtime, readBatch());
    }

    @Override // org.logstash.execution.QueueReadClient
    public void closeBatch(QueueBatch queueBatch) throws IOException {
        queueBatch.close();
        this.inflightBatches.remove(Long.valueOf(Thread.currentThread().getId()));
        Long remove = this.inflightClocks.remove(Long.valueOf(Thread.currentThread().getId()));
        if (remove == null || queueBatch.filteredSize() <= 0) {
            return;
        }
        long nanoTime = (System.nanoTime() - remove.longValue()) / 1000000;
        this.eventMetricTime.increment(nanoTime);
        this.pipelineMetricTime.increment(nanoTime);
    }

    @JRubyMethod(name = {"close_batch"})
    public void rubyCloseBatch(IRubyObject iRubyObject) throws IOException {
        closeBatch(extractQueueBatch(iRubyObject));
    }

    @JRubyMethod(name = {"start_metrics"})
    public void rubyStartMetrics(IRubyObject iRubyObject) {
        startMetrics(extractQueueBatch(iRubyObject));
    }

    private static QueueBatch extractQueueBatch(IRubyObject iRubyObject) {
        return iRubyObject instanceof JavaProxy ? (QueueBatch) ((JavaObject) iRubyObject.dataGetStruct()).getValue() : (QueueBatch) ((JavaObject) iRubyObject).getValue();
    }

    @JRubyMethod(name = {"add_filtered_metrics"})
    public void rubyAddFilteredMetrics(IRubyObject iRubyObject) {
        addFilteredMetrics(((RubyNumeric) iRubyObject).getIntValue());
    }

    @JRubyMethod(name = {"add_output_metrics"})
    public void rubyAddOutputMetrics(IRubyObject iRubyObject) {
        addOutputMetrics(((RubyNumeric) iRubyObject).getIntValue());
    }

    @Override // org.logstash.execution.QueueReadClient
    public void startMetrics(QueueBatch queueBatch) {
        long id = Thread.currentThread().getId();
        this.inflightBatches.put(Long.valueOf(id), queueBatch);
        this.inflightClocks.put(Long.valueOf(id), Long.valueOf(System.nanoTime()));
    }

    @Override // org.logstash.execution.QueueReadClient
    public void addFilteredMetrics(int i) {
        this.eventMetricFiltered.increment(i);
        this.pipelineMetricFiltered.increment(i);
    }

    @Override // org.logstash.execution.QueueReadClient
    public void addOutputMetrics(int i) {
        this.eventMetricOut.increment(i);
        this.pipelineMetricOut.increment(i);
    }

    public abstract void close() throws IOException;
}
