/*
 * Decompiled with CFR 0.152.
 */
package io.mantisrx.publish.netty.pipeline;

import com.netflix.mantis.discovery.proto.MantisWorker;
import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.Registry;
import com.netflix.spectator.api.Timer;
import io.mantisrx.publish.EventChannel;
import io.mantisrx.publish.api.Event;
import io.mantisrx.publish.internal.exceptions.NonRetryableException;
import io.mantisrx.publish.internal.exceptions.RetryableException;
import io.mantisrx.publish.internal.metrics.SpectatorUtils;
import io.mantisrx.publish.netty.pipeline.HttpEventChannelManager;
import io.mantisrx.publish.netty.proto.MantisEvent;
import io.netty.channel.Channel;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HttpEventChannel
implements EventChannel {
    public static final String CHANNEL_TYPE = "netty";
    private static final Logger LOG = LoggerFactory.getLogger(HttpEventChannel.class);
    private final Registry registry;
    private final Counter writeSuccess;
    private final Counter writeFailure;
    private final Counter nettyChannelDropped;
    private final Timer nettyWriteTime;
    private final HttpEventChannelManager channelManager;

    public HttpEventChannel(Registry registry, HttpEventChannelManager channelManager) {
        this.registry = registry;
        this.writeSuccess = SpectatorUtils.buildAndRegisterCounter((Registry)this.registry, (String)"writeSuccess", (String[])new String[]{"channel", CHANNEL_TYPE});
        this.writeFailure = SpectatorUtils.buildAndRegisterCounter((Registry)this.registry, (String)"writeFailure", (String[])new String[]{"channel", CHANNEL_TYPE});
        this.nettyChannelDropped = SpectatorUtils.buildAndRegisterCounter((Registry)this.registry, (String)"mantisEventsDropped", (String[])new String[]{"channel", CHANNEL_TYPE, "reason", "nettyBufferFull"});
        this.nettyWriteTime = SpectatorUtils.buildAndRegisterTimer((Registry)this.registry, (String)"writeTime", (String[])new String[]{"channel", CHANNEL_TYPE});
        this.channelManager = channelManager;
    }

    public CompletableFuture<Void> send(MantisWorker worker, Event event) {
        InetSocketAddress address = worker.toInetSocketAddress();
        Channel channel = this.channelManager.findOrCreate(address);
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        if (channel.isActive()) {
            if (channel.isWritable()) {
                LOG.debug("channel is writable: {} bytes remaining", (Object)channel.bytesBeforeUnwritable());
                long nettyStart = this.registry.clock().wallTime();
                MantisEvent mantisEvent = new MantisEvent(1, event.toJsonString());
                channel.writeAndFlush((Object)mantisEvent).addListener(f -> {
                    if (f.isSuccess()) {
                        this.writeSuccess.increment();
                        future.complete(null);
                    } else {
                        LOG.debug("failed to send event over netty channel", f.cause());
                        this.writeFailure.increment();
                        future.completeExceptionally((Throwable)new RetryableException(f.cause().getMessage()));
                    }
                });
                long nettyEnd = this.registry.clock().wallTime();
                this.nettyWriteTime.record(nettyEnd - nettyStart, TimeUnit.MILLISECONDS);
            } else {
                LOG.debug("channel not writable: {} bytes before writable", (Object)channel.bytesBeforeWritable());
                this.nettyChannelDropped.increment();
                future.completeExceptionally((Throwable)new RetryableException("channel not writable: " + channel.bytesBeforeWritable() + " bytes before writable"));
            }
        } else {
            future.completeExceptionally(new NonRetryableException("channel not active"));
        }
        return future;
    }

    public double bufferSize(MantisWorker worker) {
        InetSocketAddress address = worker.toInetSocketAddress();
        Channel channel = this.channelManager.findOrCreate(address);
        return channel.bytesBeforeUnwritable() / (long)channel.config().getWriteBufferHighWaterMark();
    }

    public void close(MantisWorker worker) {
        InetSocketAddress address = worker.toInetSocketAddress();
        this.channelManager.close(address);
    }
}

