package org.logstash.ext;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.jruby.Ruby;
import org.jruby.RubyClass;
import org.jruby.RubyObject;
import org.jruby.RubySymbol;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.execution.queue.QueueWriter;
import org.logstash.ext.JrubyEventExtLibrary;
import org.logstash.instrument.metrics.AbstractMetricExt;
import org.logstash.instrument.metrics.AbstractNamespacedMetricExt;
import org.logstash.instrument.metrics.MetricKeys;
import org.logstash.instrument.metrics.counter.LongCounter;

@JRubyClass(name = {"WrappedWriteClient"})
/* loaded from: input_file:org/logstash/ext/JRubyWrappedWriteClientExt.class */
public final class JRubyWrappedWriteClientExt extends RubyObject implements QueueWriter {
    private static final long serialVersionUID = 1;
    private static final RubySymbol PUSH_DURATION_KEY = RubyUtil.RUBY.newSymbol("queue_push_duration_in_millis");
    private JRubyAbstractQueueWriteClientExt writeClient;
    private LongCounter eventsMetricsCounter;
    private LongCounter eventsMetricsTime;
    private LongCounter pipelineMetricsCounter;
    private LongCounter pipelineMetricsTime;
    private LongCounter pluginMetricsCounter;
    private LongCounter pluginMetricsTime;

    public JRubyWrappedWriteClientExt(Ruby ruby, RubyClass rubyClass) {
        super(ruby, rubyClass);
    }

    @JRubyMethod(required = 4)
    public JRubyWrappedWriteClientExt initialize(ThreadContext threadContext, IRubyObject[] iRubyObjectArr) {
        return initialize((JRubyAbstractQueueWriteClientExt) iRubyObjectArr[0], iRubyObjectArr[1].asJavaString(), (AbstractMetricExt) iRubyObjectArr[2], iRubyObjectArr[3]);
    }

    public JRubyWrappedWriteClientExt initialize(JRubyAbstractQueueWriteClientExt jRubyAbstractQueueWriteClientExt, String str, AbstractMetricExt abstractMetricExt, IRubyObject iRubyObject) {
        this.writeClient = jRubyAbstractQueueWriteClientExt;
        synchronized (abstractMetricExt) {
            AbstractNamespacedMetricExt metric = getMetric(abstractMetricExt, "stats", "events");
            this.eventsMetricsCounter = LongCounter.fromRubyBase(metric, MetricKeys.IN_KEY);
            this.eventsMetricsTime = LongCounter.fromRubyBase(metric, PUSH_DURATION_KEY);
            AbstractNamespacedMetricExt metric2 = getMetric(abstractMetricExt, "stats", "pipelines", str, "events");
            this.pipelineMetricsCounter = LongCounter.fromRubyBase(metric2, MetricKeys.IN_KEY);
            this.pipelineMetricsTime = LongCounter.fromRubyBase(metric2, PUSH_DURATION_KEY);
            AbstractNamespacedMetricExt metric3 = getMetric(abstractMetricExt, "stats", "pipelines", str, "plugins", "inputs", iRubyObject.asJavaString(), "events");
            this.pluginMetricsCounter = LongCounter.fromRubyBase(metric3, MetricKeys.OUT_KEY);
            this.pluginMetricsTime = LongCounter.fromRubyBase(metric3, PUSH_DURATION_KEY);
        }
        return this;
    }

    @JRubyMethod(name = {"push", "<<"}, required = 1)
    public IRubyObject push(ThreadContext threadContext, IRubyObject iRubyObject) throws InterruptedException {
        long nanoTime = System.nanoTime();
        incrementCounters(serialVersionUID);
        JRubyAbstractQueueWriteClientExt doPush = this.writeClient.doPush(threadContext, (JrubyEventExtLibrary.RubyEvent) iRubyObject);
        incrementTimers(nanoTime);
        return doPush;
    }

    @JRubyMethod(name = {"push_batch"}, required = 1)
    public IRubyObject pushBatch(ThreadContext threadContext, IRubyObject iRubyObject) throws InterruptedException {
        long nanoTime = System.nanoTime();
        incrementCounters(((Collection) iRubyObject).size());
        JRubyAbstractQueueWriteClientExt doPushBatch = this.writeClient.doPushBatch(threadContext, (Collection) iRubyObject);
        incrementTimers(nanoTime);
        return doPushBatch;
    }

    @JRubyMethod(name = {"get_new_batch"})
    @Deprecated
    public IRubyObject newBatch(ThreadContext threadContext) {
        return threadContext.runtime.newArray();
    }

    private void incrementCounters(long j) {
        this.eventsMetricsCounter.increment(j);
        this.pipelineMetricsCounter.increment(j);
        this.pluginMetricsCounter.increment(j);
    }

    private void incrementTimers(long j) {
        long convert = TimeUnit.MILLISECONDS.convert(System.nanoTime() - j, TimeUnit.NANOSECONDS);
        this.eventsMetricsTime.increment(convert);
        this.pipelineMetricsTime.increment(convert);
        this.pluginMetricsTime.increment(convert);
    }

    private static AbstractNamespacedMetricExt getMetric(AbstractMetricExt abstractMetricExt, String... strArr) {
        return abstractMetricExt.namespace(RubyUtil.RUBY.getCurrentContext(), toSymbolArray(strArr));
    }

    private static IRubyObject toSymbolArray(String... strArr) {
        IRubyObject[] iRubyObjectArr = new IRubyObject[strArr.length];
        for (int i = 0; i < strArr.length; i++) {
            iRubyObjectArr[i] = RubyUtil.RUBY.newSymbol(strArr[i]);
        }
        return RubyUtil.RUBY.newArray(iRubyObjectArr);
    }

    @Override // org.logstash.execution.queue.QueueWriter
    public void push(Map<String, Object> map) {
        long nanoTime = System.nanoTime();
        incrementCounters(serialVersionUID);
        this.writeClient.push(map);
        incrementTimers(nanoTime);
    }
}
