/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.publish.netty.transmitters;

import com.netflix.mantis.discovery.proto.JobDiscoveryInfo;
import com.netflix.mantis.discovery.proto.MantisWorker;
import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.api.Timer;
import io.mantisrx.publish.EventChannel;
import io.mantisrx.publish.EventTransmitter;
import io.mantisrx.publish.api.Event;
import io.mantisrx.publish.config.MrePublishConfiguration;
import io.mantisrx.publish.internal.discovery.MantisJobDiscovery;
import io.mantisrx.publish.internal.exceptions.NonRetryableException;
import io.mantisrx.publish.internal.metrics.SpectatorUtils;
import io.mantisrx.publish.netty.transmitters.ChoiceOfTwoWorkerPool;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ChoiceOfTwoEventTransmitter
implements EventTransmitter {
    private static final Logger LOG = LoggerFactory.getLogger(ChoiceOfTwoEventTransmitter.class);
    private final MrePublishConfiguration configuration;
    private final Registry registry;
    private final Timer channelSendTime;
    private final MantisJobDiscovery jobDiscovery;
    private final EventChannel eventChannel;
    private final ChoiceOfTwoWorkerPool workerPool;
    private final Counter noWorkersDroppedCount;
    private final Counter noDiscoveryDroppedCount;

    public ChoiceOfTwoEventTransmitter(MrePublishConfiguration config, Registry registry, MantisJobDiscovery jobDiscovery, EventChannel eventChannel) {
        this.configuration = config;
        this.registry = registry;
        this.channelSendTime = SpectatorUtils.buildAndRegisterTimer(registry, "sendTime", "channel", "netty");
        this.noWorkersDroppedCount = SpectatorUtils.buildAndRegisterCounter(registry, "mantisEventsDropped", "reason", "transmitterNoWorkers");
        this.noDiscoveryDroppedCount = SpectatorUtils.buildAndRegisterCounter(registry, "mantisEventsDropped", "reason", "transmitterNoDiscoveryInfo");
        this.jobDiscovery = jobDiscovery;
        this.eventChannel = eventChannel;
        this.workerPool = new ChoiceOfTwoWorkerPool(config, registry, this.eventChannel);
    }

    @Override
    public void send(Event event, String stream) {
        String app = this.configuration.appName();
        String jobCluster = this.jobDiscovery.getJobCluster(app, stream);
        Optional<JobDiscoveryInfo> jobDiscoveryInfo = this.jobDiscovery.getCurrentJobWorkers(jobCluster);
        if (jobDiscoveryInfo.isPresent()) {
            List<MantisWorker> workers = jobDiscoveryInfo.get().getIngestStageWorkers().getWorkers();
            int numWorkers = workers.size();
            if (numWorkers > 0) {
                this.workerPool.refresh(workers);
                long start = this.registry.clock().wallTime();
                try {
                    this.workerPool.record(event, this.eventChannel::send);
                }
                catch (NonRetryableException e2) {
                    LOG.trace("No workers for job cluster {}, dropping event", (Object)jobCluster);
                    this.noWorkersDroppedCount.increment();
                }
                long end = this.registry.clock().wallTime();
                this.channelSendTime.record(end - start, TimeUnit.MILLISECONDS);
            } else {
                LOG.trace("No workers for job cluster {}, dropping event", (Object)jobCluster);
                this.noWorkersDroppedCount.increment();
            }
        } else {
            LOG.trace("No job discovery info for job cluster {}, dropping event", (Object)jobCluster);
            this.noDiscoveryDroppedCount.increment();
        }
    }
}

