package org.apache.nifi.processors.beats.handler;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.Inflater;
import java.util.zip.InflaterOutputStream;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.beats.protocol.Batch;
import org.apache.nifi.processors.beats.protocol.BatchMessage;
import org.apache.nifi.processors.beats.protocol.FrameType;
import org.apache.nifi.processors.beats.protocol.FrameTypeDecoder;
import org.apache.nifi.processors.beats.protocol.ProtocolCodeDecoder;
import org.apache.nifi.processors.beats.protocol.ProtocolException;
import org.apache.nifi.processors.beats.protocol.ProtocolVersion;
import org.apache.nifi.processors.beats.protocol.ProtocolVersionDecoder;

/* loaded from: input_file:org/apache/nifi/processors/beats/handler/BatchDecoder.class */
public class BatchDecoder extends ByteToMessageDecoder {
    private static final int INITIAL_WINDOW_SIZE = 1;
    private static final int INITIAL_QUEUE_SIZE = 1;
    private static final int CODE_READABLE_BYTES = 1;
    private static final int INT_READABLE_BYTES = 4;
    private static final ProtocolCodeDecoder<ProtocolVersion> VERSION_DECODER = new ProtocolVersionDecoder();
    private static final ProtocolCodeDecoder<FrameType> FRAME_TYPE_DECODER = new FrameTypeDecoder();
    private final ComponentLog log;
    private final AtomicReference<ProtocolVersion> versionRef = new AtomicReference<>();
    private final AtomicReference<FrameType> frameTypeRef = new AtomicReference<>();
    private final AtomicInteger windowSize = new AtomicInteger(1);
    private final AtomicReference<Integer> sequenceNumberRef = new AtomicReference<>();
    private final AtomicReference<Integer> payloadSizeRef = new AtomicReference<>();
    private final AtomicReference<Integer> compressedSizeRef = new AtomicReference<>();
    private Queue<BatchMessage> batchMessages = new ArrayBlockingQueue(1);

    public BatchDecoder(ComponentLog componentLog) {
        this.log = (ComponentLog) Objects.requireNonNull(componentLog, "Component Log required");
    }

    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) {
        ProtocolVersion readVersion = readVersion(byteBuf);
        if (ProtocolVersion.VERSION_2 == readVersion) {
            decodeFrameType(readFrameType(byteBuf), channelHandlerContext, byteBuf, list);
        } else if (ProtocolVersion.VERSION_1 == readVersion) {
            throw new ProtocolException("Protocol Version [1] not supported");
        }
    }

    private void decodeFrameType(FrameType frameType, ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) {
        if (frameType == null) {
            this.log.trace("Frame Type not found");
            return;
        }
        if (FrameType.COMPRESSED == frameType) {
            processCompressed(channelHandlerContext, byteBuf, list);
        } else if (FrameType.WINDOW_SIZE == frameType) {
            processWindowSize(channelHandlerContext, byteBuf);
        } else {
            if (FrameType.JSON != frameType) {
                throw new ProtocolException(String.format("Frame Type [%s] not supported", frameType));
            }
            processJson(channelHandlerContext, byteBuf, list);
        }
    }

    private void processWindowSize(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
        Integer readUnsignedInteger = readUnsignedInteger(byteBuf);
        if (readUnsignedInteger == null) {
            this.log.trace("State [Read Window Size] not enough readable bytes");
            return;
        }
        this.windowSize.getAndSet(readUnsignedInteger.intValue());
        this.batchMessages = new ArrayBlockingQueue(readUnsignedInteger.intValue());
        resetFrameTypeVersion();
        Channel channel = channelHandlerContext.channel();
        this.log.debug("Processed Window Size [{}] Local [{}] Remote [{}]", new Object[]{readUnsignedInteger, channel.localAddress(), channel.remoteAddress()});
    }

    private void processCompressed(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) {
        Integer readCompressedSize = readCompressedSize(byteBuf);
        if (readCompressedSize == null) {
            this.log.trace("State [Read Compressed] not enough readable bytes");
            return;
        }
        int readableBytes = byteBuf.readableBytes();
        if (readableBytes < readCompressedSize.intValue()) {
            this.log.trace("State [Read Compressed] not enough readable bytes [{}] for compressed [{}]", new Object[]{Integer.valueOf(readableBytes), readCompressedSize});
            return;
        }
        Channel channel = channelHandlerContext.channel();
        this.log.debug("Processing Compressed Size [{}] Local [{}] Remote [{}]", new Object[]{readCompressedSize, channel.localAddress(), channel.remoteAddress()});
        processCompressed(channelHandlerContext, byteBuf, readCompressedSize.intValue(), list);
    }

    private void processCompressed(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, int i, List<Object> list) {
        ByteBuf buffer = channelHandlerContext.alloc().buffer(i);
        try {
            readCompressedBuffer(byteBuf, buffer, i);
            resetSequenceVersionPayloadSize();
            resetFrameTypeVersion();
            while (buffer.isReadable()) {
                decode(channelHandlerContext, buffer, list);
            }
        } finally {
            this.compressedSizeRef.set(null);
            buffer.release();
        }
    }

    private void processJson(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) {
        Channel channel = channelHandlerContext.channel();
        Integer readSequenceNumber = readSequenceNumber(byteBuf);
        if (readSequenceNumber == null) {
            this.log.trace("State [Read JSON] Sequence Number not found Remote [{}]", new Object[]{channel.remoteAddress()});
            return;
        }
        Integer readPayloadSize = readPayloadSize(byteBuf);
        if (readPayloadSize == null) {
            this.log.trace("State [Read JSON] Payload Size not found Remote [{}]", new Object[]{channel.remoteAddress()});
        } else {
            processJson(readSequenceNumber.intValue(), readPayloadSize.intValue(), channelHandlerContext, byteBuf, list);
        }
    }

    private void processJson(int i, int i2, ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) {
        Channel channel = channelHandlerContext.channel();
        BatchMessage readJsonMessage = readJsonMessage(channelHandlerContext, i, i2, byteBuf);
        if (readJsonMessage == null) {
            this.log.trace("State [Read JSON] Message not found Remote [{}]", new Object[]{channel.remoteAddress()});
        } else {
            processBatchMessage(readJsonMessage, list);
            this.log.debug("Processed JSON Message Sequence Number [{}] Payload Size [{}] Local [{}] Remote [{}]", new Object[]{Integer.valueOf(i), Integer.valueOf(i2), channel.localAddress(), channel.remoteAddress()});
        }
    }

    private BatchMessage readJsonMessage(ChannelHandlerContext channelHandlerContext, int i, int i2, ByteBuf byteBuf) {
        BatchMessage batchMessage;
        int readableBytes = byteBuf.readableBytes();
        if (readableBytes >= i2) {
            byte[] bArr = new byte[i2];
            byteBuf.readBytes(bArr);
            batchMessage = new BatchMessage(getRemoteHostAddress(channelHandlerContext.channel()), bArr, i);
        } else {
            batchMessage = null;
            this.log.trace("State [Read JSON] Sequence Number [{}] not enough readable bytes [{}] for payload [{}]", new Object[]{Integer.valueOf(i), Integer.valueOf(readableBytes), Integer.valueOf(i2)});
        }
        return batchMessage;
    }

    private String getRemoteHostAddress(Channel channel) {
        SocketAddress remoteAddress = channel.remoteAddress();
        return remoteAddress instanceof InetSocketAddress ? ((InetSocketAddress) remoteAddress).getAddress().getHostAddress() : remoteAddress.toString();
    }

    private void processBatchMessage(BatchMessage batchMessage, List<Object> list) {
        if (!this.batchMessages.offer(batchMessage)) {
            throw new ProtocolException(String.format("Received message exceeds Window Size [%d]", Integer.valueOf(this.windowSize.get())));
        }
        resetSequenceVersionPayloadSize();
        resetFrameTypeVersion();
        if (this.windowSize.get() == this.batchMessages.size()) {
            list.add(new Batch(new ArrayList(this.batchMessages)));
            resetWindowSize();
        }
    }

    private void readCompressedBuffer(ByteBuf byteBuf, ByteBuf byteBuf2, int i) {
        Inflater inflater = new Inflater();
        try {
            try {
                ByteBufOutputStream byteBufOutputStream = new ByteBufOutputStream(byteBuf2);
                try {
                    InflaterOutputStream inflaterOutputStream = new InflaterOutputStream(byteBufOutputStream, inflater);
                    try {
                        byteBuf.readBytes(inflaterOutputStream, i);
                        inflaterOutputStream.close();
                        byteBufOutputStream.close();
                    } catch (Throwable th) {
                        try {
                            inflaterOutputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                        throw th;
                    }
                } catch (Throwable th3) {
                    try {
                        byteBufOutputStream.close();
                    } catch (Throwable th4) {
                        th3.addSuppressed(th4);
                    }
                    throw th3;
                }
            } finally {
                inflater.end();
            }
        } catch (IOException e) {
            throw new ProtocolException(String.format("Read Compressed Payload Size [%d] failed", Integer.valueOf(i)), e);
        }
    }

    private Integer readSequenceNumber(ByteBuf byteBuf) {
        if (this.sequenceNumberRef.get() == null) {
            Integer readUnsignedInteger = readUnsignedInteger(byteBuf);
            if (readUnsignedInteger == null) {
                this.log.trace("State [Read JSON] not enough readable bytes for Sequence Number");
            } else {
                this.sequenceNumberRef.set(readUnsignedInteger);
            }
        }
        return this.sequenceNumberRef.get();
    }

    private Integer readPayloadSize(ByteBuf byteBuf) {
        if (this.payloadSizeRef.get() == null) {
            Integer readUnsignedInteger = readUnsignedInteger(byteBuf);
            if (readUnsignedInteger == null) {
                this.log.trace("State [Read JSON] not enough readable bytes for Payload Size");
            } else {
                this.payloadSizeRef.set(readUnsignedInteger);
            }
        }
        return this.payloadSizeRef.get();
    }

    private Integer readCompressedSize(ByteBuf byteBuf) {
        if (this.compressedSizeRef.get() == null) {
            Integer readUnsignedInteger = readUnsignedInteger(byteBuf);
            if (readUnsignedInteger == null) {
                this.log.trace("State [Read Compressed] not enough readable bytes for Compressed Size");
            } else {
                this.compressedSizeRef.set(readUnsignedInteger);
            }
        }
        return this.compressedSizeRef.get();
    }

    private Integer readUnsignedInteger(ByteBuf byteBuf) {
        return byteBuf.readableBytes() >= INT_READABLE_BYTES ? Integer.valueOf(Math.toIntExact(byteBuf.readUnsignedInt())) : null;
    }

    private FrameType readFrameType(ByteBuf byteBuf) {
        if (this.frameTypeRef.get() == null) {
            int readableBytes = byteBuf.readableBytes();
            if (readableBytes >= 1) {
                this.frameTypeRef.set(FRAME_TYPE_DECODER.readProtocolCode(byteBuf.readByte()));
            } else {
                this.log.trace("State [Read Frame Type] not enough readable bytes [{}]", new Object[]{Integer.valueOf(readableBytes)});
            }
        }
        return this.frameTypeRef.get();
    }

    private ProtocolVersion readVersion(ByteBuf byteBuf) {
        if (this.versionRef.get() == null) {
            int readableBytes = byteBuf.readableBytes();
            if (readableBytes >= 1) {
                this.versionRef.set(VERSION_DECODER.readProtocolCode(byteBuf.readByte()));
            } else {
                this.log.trace("State [Read Version] not enough readable bytes [{}]", new Object[]{Integer.valueOf(readableBytes)});
            }
        }
        return this.versionRef.get();
    }

    private void resetSequenceVersionPayloadSize() {
        this.sequenceNumberRef.set(null);
        this.payloadSizeRef.set(null);
    }

    private void resetFrameTypeVersion() {
        this.frameTypeRef.set(null);
        this.versionRef.set(null);
    }

    private void resetWindowSize() {
        this.windowSize.set(1);
        this.batchMessages.clear();
    }
}
