package org.logstash.beats;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.SSLHandshakeException;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.util.Supplier;

/* loaded from: input_file:org/logstash/beats/BeatsHandler.class */
public class BeatsHandler extends SimpleChannelInboundHandler<Batch> {
    private static final Logger logger = LogManager.getLogger();
    private final IMessageListener messageListener;
    private ChannelHandlerContext context;

    public BeatsHandler(IMessageListener iMessageListener) {
        this.messageListener = iMessageListener;
    }

    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        this.context = channelHandlerContext;
        logger.trace("{}", new Supplier[]{() -> {
            return format("Channel Active");
        }});
        super.channelActive(channelHandlerContext);
        this.messageListener.onNewConnection(channelHandlerContext);
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelInactive(channelHandlerContext);
        logger.trace("{}", new Supplier[]{() -> {
            return format("Channel Inactive");
        }});
        this.messageListener.onConnectionClose(channelHandlerContext);
    }

    public void channelRead0(ChannelHandlerContext channelHandlerContext, Batch batch) throws Exception {
        logger.debug("{}", new Supplier[]{() -> {
            return format("Received a new payload");
        }});
        try {
            for (Message message : batch) {
                logger.debug("{}", new Supplier[]{() -> {
                    return format("Sending a new message for the listener, sequence: " + message.getSequence());
                }});
                this.messageListener.onNewMessage(channelHandlerContext, message);
                if (needAck(message)) {
                    ack(channelHandlerContext, message);
                }
            }
            ((AtomicBoolean) channelHandlerContext.channel().attr(ConnectionHandler.CHANNEL_SEND_KEEP_ALIVE).get()).set(false);
            logger.debug("{}: batches pending: {}", new Supplier[]{() -> {
                return channelHandlerContext.channel().id().asShortText();
            }, () -> {
                return Boolean.valueOf(((AtomicBoolean) channelHandlerContext.channel().attr(ConnectionHandler.CHANNEL_SEND_KEEP_ALIVE).get()).get());
            }});
            batch.release();
            channelHandlerContext.flush();
        } catch (Throwable th) {
            ((AtomicBoolean) channelHandlerContext.channel().attr(ConnectionHandler.CHANNEL_SEND_KEEP_ALIVE).get()).set(false);
            logger.debug("{}: batches pending: {}", new Supplier[]{() -> {
                return channelHandlerContext.channel().id().asShortText();
            }, () -> {
                return Boolean.valueOf(((AtomicBoolean) channelHandlerContext.channel().attr(ConnectionHandler.CHANNEL_SEND_KEEP_ALIVE).get()).get());
            }});
            batch.release();
            channelHandlerContext.flush();
            throw th;
        }
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        try {
            if (!(th instanceof SSLHandshakeException)) {
                this.messageListener.onException(channelHandlerContext, th);
            }
            String cls = th.getMessage() == null ? th.getClass().toString() : th.getMessage();
            logger.info("{}", new Supplier[]{() -> {
                return format("Handling exception: " + cls);
            }});
            logger.catching(Level.DEBUG, th);
            channelHandlerContext.flush();
            channelHandlerContext.close();
        } catch (Throwable th2) {
            channelHandlerContext.flush();
            channelHandlerContext.close();
            throw th2;
        }
    }

    private boolean needAck(Message message) {
        return message.getSequence() == message.getBatch().getBatchSize();
    }

    private void ack(ChannelHandlerContext channelHandlerContext, Message message) {
        logger.trace("{}", new Supplier[]{() -> {
            return format("Acking message number " + message.getSequence());
        }});
        writeAck(channelHandlerContext, message.getBatch().getProtocol(), message.getSequence());
    }

    private void writeAck(ChannelHandlerContext channelHandlerContext, byte b, int i) {
        channelHandlerContext.write(new Ack(b, i));
    }

    private String format(String str) {
        String str2;
        String str3;
        SocketAddress localAddress = this.context.channel().localAddress();
        SocketAddress remoteAddress = this.context.channel().remoteAddress();
        if (localAddress == null || !(localAddress instanceof InetSocketAddress)) {
            str2 = "undefined";
        } else {
            InetSocketAddress inetSocketAddress = (InetSocketAddress) localAddress;
            str2 = inetSocketAddress.getAddress().getHostAddress() + ":" + inetSocketAddress.getPort();
        }
        if (remoteAddress == null || !(remoteAddress instanceof InetSocketAddress)) {
            str3 = "undefined";
        } else {
            InetSocketAddress inetSocketAddress2 = (InetSocketAddress) localAddress;
            str3 = inetSocketAddress2.getAddress().getHostAddress() + ":" + inetSocketAddress2.getPort();
        }
        return "[local: " + str2 + ", remote: " + str3 + "] " + str;
    }
}
