/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.publish.netty.pipeline;

import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.impl.AtomicDouble;
import io.mantisrx.publish.config.MrePublishConfiguration;
import io.mantisrx.publish.internal.metrics.SpectatorUtils;
import io.mantisrx.publish.netty.pipeline.HttpEventChannel;
import io.mantisrx.publish.netty.pipeline.HttpEventChannelInitializer;
import io.mantisrx.publish.netty.pipeline.MantisMessageSizeEstimator;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.DefaultEventLoopGroup;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HttpEventChannelManager {
    private static final Logger LOG = LoggerFactory.getLogger(HttpEventChannel.class);
    private final Counter connectionSuccess;
    private final Counter connectionFailure;
    private final AtomicDouble liveConnections;
    private final AtomicDouble nettyChannelBufferSize;
    private final int lowWriteBufferWatermark;
    private final int highWriteBufferWatermark;
    private final EventLoopGroup eventLoopGroup;
    private final EventLoopGroup encoderEventLoopGroup;
    private final Bootstrap bootstrap;
    private final ConcurrentMap<String, Channel> channels;

    public HttpEventChannelManager(Registry registry, MrePublishConfiguration config) {
        this.connectionSuccess = SpectatorUtils.buildAndRegisterCounter((Registry)registry, (String)"connectionSuccess", (String[])new String[]{"channel", "netty"});
        this.connectionFailure = SpectatorUtils.buildAndRegisterCounter((Registry)registry, (String)"connectionFailure", (String[])new String[]{"channel", "netty"});
        this.liveConnections = SpectatorUtils.buildAndRegisterGauge((Registry)registry, (String)"liveConnections", (String[])new String[]{"channel", "netty"});
        this.nettyChannelBufferSize = SpectatorUtils.buildAndRegisterGauge((Registry)registry, (String)"bufferSize", (String[])new String[]{"channel", "netty"});
        this.lowWriteBufferWatermark = config.getLowWriteBufferWatermark();
        this.highWriteBufferWatermark = config.getHighWriteBufferWatermark();
        this.eventLoopGroup = new NioEventLoopGroup(config.getIoThreads());
        boolean gzipEnabled = config.getGzipEnabled();
        this.encoderEventLoopGroup = gzipEnabled ? new DefaultEventLoopGroup(config.getCompressionThreads()) : null;
        this.bootstrap = (Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)new Bootstrap().group(this.eventLoopGroup)).channel(NioSocketChannel.class)).option(ChannelOption.MESSAGE_SIZE_ESTIMATOR, (Object)MantisMessageSizeEstimator.DEFAULT)).option(ChannelOption.WRITE_BUFFER_WATER_MARK, (Object)new WriteBufferWaterMark(this.lowWriteBufferWatermark, this.highWriteBufferWatermark))).handler((ChannelHandler)new HttpEventChannelInitializer(registry, config, this.encoderEventLoopGroup));
        this.channels = new ConcurrentHashMap<String, Channel>();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> ((EventLoopGroup)this.eventLoopGroup).shutdownGracefully()));
    }

    Channel findOrCreate(InetSocketAddress address) {
        Channel channel = this.find(address);
        if (channel == null) {
            LOG.debug("creating new channel for {}", (Object)address);
            ChannelFuture channelFuture = this.bootstrap.connect((SocketAddress)address);
            channel = channelFuture.channel();
            this.channels.put(this.getHostPortString(address), channel);
            channel.closeFuture().addListener(future -> {
                LOG.debug("closing channel for {}", (Object)address);
                this.channels.remove(this.getHostPortString(address));
                this.liveConnections.set((double)this.channels.size());
            });
            channelFuture.addListener(future -> {
                if (future.isSuccess()) {
                    LOG.debug("connection success for {}", (Object)address);
                    this.connectionSuccess.increment();
                    this.liveConnections.set((double)this.channels.size());
                } else {
                    LOG.debug("failed to connect to {}", (Object)address);
                    this.connectionFailure.increment();
                }
            });
        }
        this.nettyChannelBufferSize.set((double)((long)this.highWriteBufferWatermark - channel.bytesBeforeUnwritable()));
        return channel;
    }

    private Channel find(InetSocketAddress address) {
        return (Channel)this.channels.get(this.getHostPortString(address));
    }

    void close(InetSocketAddress address) {
        Channel channel = this.find(address);
        if (channel != null) {
            channel.close();
        }
    }

    private String getHostPortString(InetSocketAddress address) {
        return address.getHostString() + ':' + address.getPort();
    }
}

