package org.opendaylight.ocpjava.protocol.impl.core.connection;

import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetSocketAddress;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/opendaylight/ocpjava/protocol/impl/core/connection/ChannelOutboundQueue.class */
final class ChannelOutboundQueue extends ChannelInboundHandlerAdapter {
    private static final int WORKTIME_RECHECK_MSGS = 64;
    private final Runnable flushRunnable = new Runnable() { // from class: org.opendaylight.ocpjava.protocol.impl.core.connection.ChannelOutboundQueue.1
        @Override // java.lang.Runnable
        public void run() {
            ChannelOutboundQueue.this.flush();
        }
    };
    private volatile int flushScheduled = 0;
    private final Queue<MessageHolder<?>> queue;
    private final long maxWorkTime;
    private final Channel channel;
    private final InetSocketAddress address;
    private static final long DEFAULT_WORKTIME_MICROS = TimeUnit.MILLISECONDS.toMicros(100);
    private static final Logger LOG = LoggerFactory.getLogger(ChannelOutboundQueue.class);
    private static final AtomicIntegerFieldUpdater<ChannelOutboundQueue> FLUSH_SCHEDULED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundQueue.class, "flushScheduled");

    /* loaded from: input_file:org/opendaylight/ocpjava/protocol/impl/core/connection/ChannelOutboundQueue$MessageHolder.class */
    public interface MessageHolder<T> {
        GenericFutureListener<Future<Void>> takeListener();

        T takeMessage();
    }

    public ChannelOutboundQueue(Channel channel, int i, InetSocketAddress inetSocketAddress) {
        Preconditions.checkArgument(i > 0, "Queue depth has to be positive");
        this.queue = new LinkedBlockingQueue(i);
        this.channel = (Channel) Preconditions.checkNotNull(channel);
        this.maxWorkTime = TimeUnit.MICROSECONDS.toNanos(DEFAULT_WORKTIME_MICROS);
        this.address = inetSocketAddress;
    }

    public boolean enqueue(MessageHolder<?> messageHolder) {
        LOG.trace("Enqueuing message {}", messageHolder);
        if (!this.queue.offer(messageHolder)) {
            LOG.debug("Message queue is full");
            return false;
        }
        LOG.trace("Message enqueued");
        conditionalFlush();
        return true;
    }

    private void scheduleFlush(EventExecutor eventExecutor) {
        if (!FLUSH_SCHEDULED_UPDATER.compareAndSet(this, 0, 1)) {
            LOG.trace("Flush task is already present");
        } else {
            LOG.trace("Scheduling flush task");
            eventExecutor.execute(this.flushRunnable);
        }
    }

    private void conditionalFlush() {
        if (this.queue.isEmpty()) {
            LOG.trace("Queue is empty, not flush needed");
        } else if (this.channel.isWritable()) {
            scheduleFlush(this.channel.pipeline().lastContext().executor());
        } else {
            LOG.trace("Channel {} is not writable, not issuing a flush", this.channel);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void flush() {
        long j;
        long nanoTime = System.nanoTime();
        long j2 = nanoTime + this.maxWorkTime;
        LOG.debug("Dequeuing messages to channel {}", this.channel);
        long j3 = 0;
        while (true) {
            j = j3;
            if (!this.channel.isWritable()) {
                LOG.trace("Channel is no longer writable");
                break;
            }
            MessageHolder<?> poll = this.queue.poll();
            if (poll != null) {
                GenericFutureListener<Future<Void>> takeListener = poll.takeListener();
                ChannelFuture write = this.channel.write(new MessageListenerWrapper(poll.takeMessage(), takeListener));
                if (takeListener != null) {
                    write.addListener(takeListener);
                }
                if (j % 64 == 0 && System.nanoTime() >= j2) {
                    LOG.trace("Exceeded allotted work time {}us", Long.valueOf(TimeUnit.NANOSECONDS.toMicros(this.maxWorkTime)));
                    break;
                }
                j3 = j + 1;
            } else {
                LOG.trace("The queue is completely drained");
                break;
            }
        }
        if (j > 0) {
            LOG.debug("Flushing {} message(s) to channel {}", Long.valueOf(j), this.channel);
            this.channel.flush();
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Flushed {} messages in {}us to channel {}", new Object[]{Long.valueOf(j), Long.valueOf(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - nanoTime)), this.channel});
        }
        if (!FLUSH_SCHEDULED_UPDATER.compareAndSet(this, 1, 0)) {
            LOG.warn("Channel {} queue {} flusher found unscheduled", this.channel, this.queue);
        }
        conditionalFlush();
    }

    private void conditionalFlush(ChannelHandlerContext channelHandlerContext) {
        Preconditions.checkState(channelHandlerContext.channel().equals(this.channel), "Inconsistent channel %s with context %s", this.channel, channelHandlerContext);
        conditionalFlush();
    }

    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelActive(channelHandlerContext);
        conditionalFlush(channelHandlerContext);
    }

    public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelWritabilityChanged(channelHandlerContext);
        conditionalFlush(channelHandlerContext);
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelInactive(channelHandlerContext);
        long j = 0;
        LOG.debug("Channel shutdown, flushing queue...");
        ChannelFuture newFailedFuture = channelHandlerContext.newFailedFuture(new RejectedExecutionException("Channel disconnected"));
        while (true) {
            MessageHolder<?> poll = this.queue.poll();
            if (poll == null) {
                LOG.debug("Flushed {} queue entries", Long.valueOf(j));
                return;
            } else {
                poll.takeListener().operationComplete(newFailedFuture);
                j++;
            }
        }
    }

    public String toString() {
        return String.format("Channel %s queue [%s messages flushing=%s]", this.channel, Integer.valueOf(this.queue.size()), Integer.valueOf(this.flushScheduled));
    }
}
