package org.logstash.execution;

import java.util.Collection;
import org.jruby.Ruby;
import org.jruby.RubyArray;
import org.jruby.RubyBasicObject;
import org.jruby.RubyClass;
import org.jruby.RubyHash;
import org.jruby.RubyString;
import org.jruby.RubySymbol;
import org.jruby.anno.JRubyClass;
import org.jruby.anno.JRubyMethod;
import org.jruby.runtime.Block;
import org.jruby.runtime.ThreadContext;
import org.jruby.runtime.builtin.IRubyObject;
import org.logstash.RubyUtil;
import org.logstash.config.ir.compiler.AbstractOutputDelegatorExt;

@JRubyClass(name = {"PipelineReporter"})
/* loaded from: input_file:org/logstash/execution/PipelineReporterExt.class */
public final class PipelineReporterExt extends RubyBasicObject {
    private static final long serialVersionUID = 1;
    private static final RubySymbol EVENTS_FILTERED_KEY = RubyUtil.RUBY.newSymbol("events_filtered");
    private static final RubySymbol EVENTS_CONSUMED_KEY = RubyUtil.RUBY.newSymbol("events_consumed");
    private static final RubySymbol INFLIGHT_COUNT_KEY = RubyUtil.RUBY.newSymbol("inflight_count");
    private static final RubySymbol WORKER_STATES_KEY = RubyUtil.RUBY.newSymbol("worker_states");
    private static final RubySymbol OUTPUT_INFO_KEY = RubyUtil.RUBY.newSymbol("output_info");
    private static final RubySymbol THREAD_INFO_KEY = RubyUtil.RUBY.newSymbol("thread_info");
    private static final RubySymbol STALLING_THREADS_INFO_KEY = RubyUtil.RUBY.newSymbol("stalling_threads_info");
    private static final RubySymbol TYPE_KEY = RubyUtil.RUBY.newSymbol("type");
    private static final RubySymbol ID_KEY = RubyUtil.RUBY.newSymbol("id");
    private static final RubySymbol STATUS_KEY = RubyUtil.RUBY.newSymbol("status");
    private static final RubySymbol ALIVE_KEY = RubyUtil.RUBY.newSymbol("alive");
    private static final RubySymbol INDEX_KEY = RubyUtil.RUBY.newSymbol("index");
    private static final RubySymbol CONCURRENCY_KEY = RubyUtil.RUBY.newSymbol("concurrency");
    private static final RubyString DEAD_STATUS = RubyUtil.RUBY.newString("dead").newFrozen();
    private IRubyObject logger;
    private IRubyObject pipeline;

    @JRubyClass(name = {"Snapshot"})
    /* loaded from: input_file:org/logstash/execution/PipelineReporterExt$SnapshotExt.class */
    public static final class SnapshotExt extends RubyBasicObject {
        private static final long serialVersionUID = 1;
        private static final RubyString INFLIGHT_COUNT_KEY = RubyUtil.RUBY.newString("inflight_count").newFrozen();
        private static final RubyString STALLING_THREADS_KEY = RubyUtil.RUBY.newString("stalling_threads_info").newFrozen();
        private static final RubyString PLUGIN_KEY = RubyUtil.RUBY.newString("plugin").newFrozen();
        private static final RubyString OTHER_KEY = RubyUtil.RUBY.newString("other").newFrozen();
        private RubyHash data;

        public SnapshotExt(Ruby ruby, RubyClass rubyClass) {
            super(ruby, rubyClass);
        }

        @JRubyMethod
        public SnapshotExt initialize(IRubyObject iRubyObject) {
            this.data = (RubyHash) iRubyObject;
            return this;
        }

        @JRubyMethod(name = {"to_hash"})
        public RubyHash toHash() {
            return this.data;
        }

        @JRubyMethod(name = {"to_simple_hash"})
        public RubyHash toSimpleHash(ThreadContext threadContext) {
            RubyHash newHash = RubyHash.newHash(threadContext.runtime);
            newHash.op_aset(threadContext, INFLIGHT_COUNT_KEY, this.data.op_aref(threadContext, INFLIGHT_COUNT_KEY.intern()));
            newHash.op_aset(threadContext, STALLING_THREADS_KEY, formatThreadsByPlugin(threadContext));
            return newHash;
        }

        @JRubyMethod(name = {"to_s", "to_str"})
        public RubyString toStr(ThreadContext threadContext) {
            return toSimpleHash(threadContext).to_s(threadContext);
        }

        @JRubyMethod(name = {"method_missing"})
        public IRubyObject methodMissing(ThreadContext threadContext, IRubyObject iRubyObject) {
            return this.data.op_aref(threadContext, iRubyObject);
        }

        @JRubyMethod(name = {"format_threads_by_plugin"})
        public RubyHash formatThreadsByPlugin(ThreadContext threadContext) {
            RubyHash newHash = RubyHash.newHash(threadContext.runtime);
            ((Iterable) this.data.get(STALLING_THREADS_KEY.intern())).forEach(obj -> {
                RubyHash rubyHash = (RubyHash) obj;
                IRubyObject delete = rubyHash.delete(threadContext, PLUGIN_KEY, Block.NULL_BLOCK);
                if (delete.isNil()) {
                    delete = OTHER_KEY;
                }
                if (newHash.op_aref(threadContext, delete).isNil()) {
                    newHash.op_aset(threadContext, delete, threadContext.runtime.newArray());
                }
                newHash.op_aref(threadContext, delete).append(rubyHash);
            });
            return newHash;
        }
    }

    public PipelineReporterExt(Ruby ruby, RubyClass rubyClass) {
        super(ruby, rubyClass);
    }

    @JRubyMethod
    public PipelineReporterExt initialize(ThreadContext threadContext, IRubyObject iRubyObject, IRubyObject iRubyObject2) {
        this.logger = iRubyObject;
        this.pipeline = iRubyObject2;
        return this;
    }

    @JRubyMethod
    public IRubyObject pipeline() {
        return this.pipeline;
    }

    @JRubyMethod
    public IRubyObject logger() {
        return this.logger;
    }

    @JRubyMethod
    public SnapshotExt snapshot(ThreadContext threadContext) {
        return new SnapshotExt(threadContext.runtime, RubyUtil.PIPELINE_REPORTER_SNAPSHOT_CLASS).initialize(toHash(threadContext));
    }

    @JRubyMethod(name = {"to_hash"})
    public RubyHash toHash(ThreadContext threadContext) {
        RubyHash newHash = RubyHash.newHash(threadContext.runtime);
        RubyArray workerStates = workerStates(threadContext, (RubyHash) this.pipeline.callMethod(threadContext, "filter_queue_client").callMethod(threadContext, "inflight_batches"));
        newHash.op_aset(threadContext, WORKER_STATES_KEY, workerStates);
        newHash.op_aset(threadContext, EVENTS_FILTERED_KEY, this.pipeline.callMethod(threadContext, "events_filtered").callMethod(threadContext, "sum"));
        newHash.op_aset(threadContext, EVENTS_CONSUMED_KEY, this.pipeline.callMethod(threadContext, "events_consumed").callMethod(threadContext, "sum"));
        newHash.op_aset(threadContext, OUTPUT_INFO_KEY, outputInfo(threadContext));
        newHash.op_aset(threadContext, THREAD_INFO_KEY, this.pipeline.callMethod(threadContext, "plugin_threads_info"));
        newHash.op_aset(threadContext, STALLING_THREADS_INFO_KEY, this.pipeline.callMethod(threadContext, "stalling_threads_info"));
        newHash.op_aset(threadContext, INFLIGHT_COUNT_KEY, threadContext.runtime.newFixnum(calcInflightCount(threadContext, workerStates)));
        return newHash;
    }

    private RubyArray workerStates(ThreadContext threadContext, RubyHash rubyHash) {
        RubyArray newArray = threadContext.runtime.newArray();
        this.pipeline.callMethod(threadContext, "worker_threads").forEach(iRubyObject -> {
            RubyHash newHash = RubyHash.newHash(threadContext.runtime);
            IRubyObject callMethod = iRubyObject.callMethod(threadContext, "status");
            if (callMethod.isNil()) {
                callMethod = DEAD_STATUS;
            }
            newHash.op_aset(threadContext, STATUS_KEY, callMethod);
            newHash.op_aset(threadContext, ALIVE_KEY, iRubyObject.callMethod(threadContext, "alive?"));
            newHash.op_aset(threadContext, INDEX_KEY, threadContext.runtime.newFixnum(newArray.size()));
            IRubyObject op_aref = rubyHash.op_aref(threadContext, iRubyObject);
            newHash.op_aset(threadContext, INFLIGHT_COUNT_KEY, op_aref.isNil() ? threadContext.runtime.newFixnum(0) : op_aref.callMethod(threadContext, "size"));
            newArray.add(newHash);
        });
        return newArray;
    }

    private RubyArray outputInfo(ThreadContext threadContext) {
        RubyArray newArray = threadContext.runtime.newArray();
        Iterable callMethod = this.pipeline.callMethod(threadContext, "outputs");
        (callMethod instanceof Iterable ? callMethod : (Iterable) callMethod.toJava(Iterable.class)).forEach(iRubyObject -> {
            AbstractOutputDelegatorExt abstractOutputDelegatorExt = (AbstractOutputDelegatorExt) iRubyObject;
            RubyHash newHash = RubyHash.newHash(threadContext.runtime);
            newHash.op_aset(threadContext, TYPE_KEY, abstractOutputDelegatorExt.configName(threadContext));
            newHash.op_aset(threadContext, ID_KEY, abstractOutputDelegatorExt.getId());
            newHash.op_aset(threadContext, CONCURRENCY_KEY, abstractOutputDelegatorExt.concurrency(threadContext));
            newArray.add(newHash);
        });
        return newArray;
    }

    private static int calcInflightCount(ThreadContext threadContext, Collection<?> collection) {
        return collection.stream().mapToInt(obj -> {
            return ((RubyHash) obj).op_aref(threadContext, INFLIGHT_COUNT_KEY).convertToInteger().getIntValue();
        }).sum();
    }
}
