package org.apache.nifi.processors.beats.handler;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.util.Collection;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.beats.protocol.Batch;
import org.apache.nifi.processors.beats.protocol.BatchMessage;
import org.apache.nifi.processors.beats.protocol.MessageAck;

@ChannelHandler.Sharable
/* loaded from: input_file:org/apache/nifi/processors/beats/handler/BatchChannelInboundHandler.class */
public class BatchChannelInboundHandler extends SimpleChannelInboundHandler<Batch> {
    private final ComponentLog log;
    private final BlockingQueue<BatchMessage> messages;

    public BatchChannelInboundHandler(ComponentLog componentLog, BlockingQueue<BatchMessage> blockingQueue) {
        this.log = (ComponentLog) Objects.requireNonNull(componentLog, "Component Log required");
        this.messages = (BlockingQueue) Objects.requireNonNull(blockingQueue, "Message Queue required");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void channelRead0(ChannelHandlerContext channelHandlerContext, Batch batch) {
        Integer num = null;
        Collection<BatchMessage> messages = batch.getMessages();
        int i = 0;
        Iterator<BatchMessage> it = messages.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            BatchMessage next = it.next();
            int sequenceNumber = next.getSequenceNumber();
            String sender = next.getSender();
            if (!this.messages.offer(next)) {
                this.log.warn("Message Sequence Number [{}] Sender [{}] queuing failed: Queued [{}] of [{}]", new Object[]{Integer.valueOf(sequenceNumber), sender, Integer.valueOf(i), Integer.valueOf(messages.size())});
                break;
            } else {
                this.log.debug("Message Sequence Number [{}] Sender [{}] queued", new Object[]{Integer.valueOf(sequenceNumber), sender});
                num = Integer.valueOf(next.getSequenceNumber());
                i++;
            }
        }
        if (num == null) {
            this.log.warn("Batch Messages [{}] queuing failed", new Object[]{Integer.valueOf(batch.getMessages().size())});
        } else {
            channelHandlerContext.writeAndFlush(new MessageAck(num.intValue()));
        }
    }
}
