package org.opendaylight.ocpjava.protocol.impl.core.connection;

import com.google.common.base.Preconditions;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opendaylight.ocpjava.protocol.api.connection.OutboundQueue;
import org.opendaylight.ocpjava.protocol.api.connection.OutboundQueueHandler;
import org.opendaylight.yang.gen.v1.urn.opendaylight.ocp.common.types.rev150811.OcpHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/opendaylight/ocpjava/protocol/impl/core/connection/OutboundQueueManager.class */
public final class OutboundQueueManager<T extends OutboundQueueHandler> extends ChannelInboundHandlerAdapter implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueManager.class);
    private static final int DEFAULT_LOW_WATERMARK = 131072;
    private static final int DEFAULT_HIGH_WATERMARK = 262144;
    private final StackedOutboundQueue currentQueue;
    private final ConnectionAdapterImpl parent;
    private final InetSocketAddress address;
    private final int maxNonBarrierMessages;
    private final T handler;
    private boolean alreadyReading;
    private boolean shuttingDown;
    private final AtomicBoolean flushScheduled = new AtomicBoolean();
    private volatile PipelineState state = PipelineState.IDLE;
    private final Runnable flushRunnable = new Runnable() { // from class: org.opendaylight.ocpjava.protocol.impl.core.connection.OutboundQueueManager.1
        @Override // java.lang.Runnable
        public void run() {
            OutboundQueueManager.this.flush();
        }
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.opendaylight.ocpjava.protocol.impl.core.connection.OutboundQueueManager$2, reason: invalid class name */
    /* loaded from: input_file:org/opendaylight/ocpjava/protocol/impl/core/connection/OutboundQueueManager$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$opendaylight$ocpjava$protocol$impl$core$connection$OutboundQueueManager$PipelineState = new int[PipelineState.values().length];

        static {
            try {
                $SwitchMap$org$opendaylight$ocpjava$protocol$impl$core$connection$OutboundQueueManager$PipelineState[PipelineState.READING.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$opendaylight$ocpjava$protocol$impl$core$connection$OutboundQueueManager$PipelineState[PipelineState.WRITING.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$opendaylight$ocpjava$protocol$impl$core$connection$OutboundQueueManager$PipelineState[PipelineState.IDLE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/opendaylight/ocpjava/protocol/impl/core/connection/OutboundQueueManager$PipelineState.class */
    public enum PipelineState {
        IDLE,
        READING,
        WRITING
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public OutboundQueueManager(ConnectionAdapterImpl connectionAdapterImpl, InetSocketAddress inetSocketAddress, T t, int i) {
        this.parent = (ConnectionAdapterImpl) Preconditions.checkNotNull(connectionAdapterImpl);
        this.handler = (T) Preconditions.checkNotNull(t);
        Preconditions.checkArgument(i > 0);
        this.maxNonBarrierMessages = i;
        this.address = inetSocketAddress;
        this.currentQueue = new StackedOutboundQueue(this);
        LOG.debug("Queue manager instantiated with queue {}", this.currentQueue);
        t.onConnectionQueueChanged(this.currentQueue);
    }

    T getHandler() {
        return this.handler;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.handler.onConnectionQueueChanged((OutboundQueue) null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean onMessage(OcpHeader ocpHeader) {
        LOG.trace("Attempting to pair message {} to a request", ocpHeader);
        return this.currentQueue.pairRequest(ocpHeader);
    }

    private void scheduleFlush() {
        if (!this.flushScheduled.compareAndSet(false, true)) {
            LOG.trace("Flush task is already present on channel {}", this.parent.getChannel());
        } else {
            LOG.trace("Scheduling flush task on channel {}", this.parent.getChannel());
            this.parent.getChannel().eventLoop().execute(this.flushRunnable);
        }
    }

    private void rescheduleFlush() {
        if (!this.flushScheduled.compareAndSet(true, false)) {
            LOG.warn("Channel {} queue {} flusher found unscheduled", this.parent.getChannel(), this);
        }
        conditionalFlush();
    }

    private void writeAndFlush() {
        this.state = PipelineState.WRITING;
        long nanoTime = System.nanoTime();
        int writeEntries = this.currentQueue.writeEntries(this.parent.getChannel(), nanoTime);
        if (writeEntries > 0) {
            LOG.trace("Flushing channel {}", this.parent.getChannel());
            this.parent.getChannel().flush();
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Flushed {} messages to channel {} in {}us", new Object[]{Integer.valueOf(writeEntries), this.parent.getChannel(), Long.valueOf(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - nanoTime))});
        }
        this.state = PipelineState.IDLE;
    }

    protected void flush() {
        if (!this.shuttingDown) {
            LOG.trace("Dequeuing messages to channel {}", this.parent.getChannel());
            writeAndFlush();
            rescheduleFlush();
        } else if (this.currentQueue.finishShutdown()) {
            this.handler.onConnectionQueueChanged((OutboundQueue) null);
            LOG.debug("Channel {} shutdown complete", this.parent.getChannel());
        } else {
            LOG.trace("Channel {} current queue not completely flushed yet", this.parent.getChannel());
            rescheduleFlush();
        }
    }

    private void conditionalFlush() {
        if (!this.currentQueue.needsFlush()) {
            LOG.trace("Queue is empty, no flush needed");
        } else if (this.shuttingDown || this.parent.getChannel().isWritable()) {
            scheduleFlush();
        } else {
            LOG.debug("Channel {} is not I/O ready, not scheduling a flush", this.parent.getChannel());
        }
    }

    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelActive(channelHandlerContext);
        conditionalFlush();
    }

    public void handlerAdded(ChannelHandlerContext channelHandlerContext) throws Exception {
        channelHandlerContext.channel().config().setWriteBufferHighWaterMark(DEFAULT_HIGH_WATERMARK);
        channelHandlerContext.channel().config().setWriteBufferLowWaterMark(DEFAULT_LOW_WATERMARK);
        super.handlerAdded(channelHandlerContext);
    }

    public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelWritabilityChanged(channelHandlerContext);
        LOG.debug("Channel {} writability changed, invoking flush", this.parent.getChannel());
        writeAndFlush();
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelInactive(channelHandlerContext);
        LOG.debug("Channel {} initiating shutdown...", channelHandlerContext.channel());
        this.shuttingDown = true;
        LOG.debug("Cleared {} queue entries from channel {}", Long.valueOf(this.currentQueue.startShutdown(channelHandlerContext.channel())), channelHandlerContext.channel());
        scheduleFlush();
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if (!this.alreadyReading) {
            this.alreadyReading = true;
            this.state = PipelineState.READING;
        }
        super.channelRead(channelHandlerContext, obj);
    }

    public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelReadComplete(channelHandlerContext);
        writeAndFlush();
    }

    public String toString() {
        return String.format("Channel %s queue [flushing=%s]", this.parent.getChannel(), Boolean.valueOf(this.flushScheduled.get()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void ensureFlushing() {
        if (this.parent.getChannel().isWritable()) {
            PipelineState pipelineState = this.state;
            LOG.debug("Synchronize on pipeline state {}", pipelineState);
            switch (AnonymousClass2.$SwitchMap$org$opendaylight$ocpjava$protocol$impl$core$connection$OutboundQueueManager$PipelineState[pipelineState.ordinal()]) {
                case ConnectionAdapterImpl.RPC_RESPONSE_EXPIRATION /* 1 */:
                    return;
                case 2:
                case 3:
                default:
                    scheduleFlush();
                    return;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void writeMessage(OcpHeader ocpHeader, long j) {
        this.parent.getChannel().write(new MessageListenerWrapper(ocpHeader, null));
    }
}
