package org.opendaylight.openflowjava.protocol.impl.core.connection;

import com.google.common.base.Preconditions;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueue;
import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
import org.opendaylight.openflowjava.protocol.impl.core.connection.AbstractStackedOutboundQueue;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractOutboundQueueManager.class */
public abstract class AbstractOutboundQueueManager<T extends OutboundQueueHandler, O extends AbstractStackedOutboundQueue> extends ChannelInboundHandlerAdapter implements AutoCloseable {
    private static final int DEFAULT_LOW_WATERMARK = 131072;
    private static final int DEFAULT_HIGH_WATERMARK = 262144;
    protected final ConnectionAdapterImpl parent;
    protected final InetSocketAddress address;
    private final T handler;
    private boolean alreadyReading;
    protected boolean shuttingDown;
    private static final Logger LOG = LoggerFactory.getLogger(AbstractOutboundQueueManager.class);
    private static final GenericFutureListener<Future<Void>> LOG_ENCODER_LISTENER = new GenericFutureListener<Future<Void>>() { // from class: org.opendaylight.openflowjava.protocol.impl.core.connection.AbstractOutboundQueueManager.2
        private final Logger LOGGER = LoggerFactory.getLogger("LogEncoderListener");

        public void operationComplete(Future<Void> future) throws Exception {
            if (future.cause() != null) {
                this.LOGGER.warn("Message encoding fail !", future.cause());
            }
        }
    };
    private final AtomicBoolean flushScheduled = new AtomicBoolean();
    private volatile PipelineState state = PipelineState.IDLE;
    protected final Runnable flushRunnable = new Runnable() { // from class: org.opendaylight.openflowjava.protocol.impl.core.connection.AbstractOutboundQueueManager.1
        @Override // java.lang.Runnable
        public void run() {
            AbstractOutboundQueueManager.this.flush();
        }
    };
    protected final O currentQueue = initializeStackedOutboudnqueue();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/opendaylight/openflowjava/protocol/impl/core/connection/AbstractOutboundQueueManager$PipelineState.class */
    public enum PipelineState {
        IDLE,
        READING,
        WRITING
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractOutboundQueueManager(ConnectionAdapterImpl connectionAdapterImpl, InetSocketAddress inetSocketAddress, T t) {
        this.parent = (ConnectionAdapterImpl) Preconditions.checkNotNull(connectionAdapterImpl);
        this.handler = (T) Preconditions.checkNotNull(t);
        this.address = inetSocketAddress;
        LOG.debug("Queue manager instantiated with queue {}", this.currentQueue);
        t.onConnectionQueueChanged(this.currentQueue);
    }

    protected abstract O initializeStackedOutboudnqueue();

    @Override // java.lang.AutoCloseable
    public void close() {
        this.handler.onConnectionQueueChanged((OutboundQueue) null);
    }

    public String toString() {
        return String.format("Channel %s queue [flushing=%s]", this.parent.getChannel(), Boolean.valueOf(this.flushScheduled.get()));
    }

    public void handlerAdded(ChannelHandlerContext channelHandlerContext) throws Exception {
        channelHandlerContext.channel().config().setWriteBufferHighWaterMark(DEFAULT_HIGH_WATERMARK);
        channelHandlerContext.channel().config().setWriteBufferLowWaterMark(DEFAULT_LOW_WATERMARK);
        super.handlerAdded(channelHandlerContext);
    }

    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelActive(channelHandlerContext);
        conditionalFlush();
    }

    public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelReadComplete(channelHandlerContext);
        writeAndFlush();
        this.alreadyReading = false;
    }

    public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelWritabilityChanged(channelHandlerContext);
        LOG.debug("Channel {} writability changed, invoking flush", this.parent.getChannel());
        writeAndFlush();
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelInactive(channelHandlerContext);
        LOG.debug("Channel {} initiating shutdown...", channelHandlerContext.channel());
        this.shuttingDown = true;
        LOG.debug("Cleared {} queue entries from channel {}", Long.valueOf(this.currentQueue.startShutdown(channelHandlerContext.channel())), channelHandlerContext.channel());
        scheduleFlush();
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if (!this.alreadyReading) {
            this.alreadyReading = true;
            this.state = PipelineState.READING;
        }
        super.channelRead(channelHandlerContext, obj);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean onMessage(OfHeader ofHeader) {
        LOG.trace("Attempting to pair message {} to a request", ofHeader);
        return this.currentQueue.pairRequest(ofHeader);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public T getHandler() {
        return this.handler;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void ensureFlushing() {
        if (this.parent.getChannel().isWritable()) {
            PipelineState pipelineState = this.state;
            LOG.debug("Synchronize on pipeline state {}", pipelineState);
            switch (pipelineState) {
                case READING:
                    return;
                case WRITING:
                case IDLE:
                default:
                    scheduleFlush();
                    return;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onEchoRequest(EchoRequestMessage echoRequestMessage) {
        this.parent.getChannel().writeAndFlush(makeMessageListenerWrapper(new EchoReplyInputBuilder().setData(echoRequestMessage.getData()).setVersion(echoRequestMessage.getVersion()).setXid(echoRequestMessage.getXid()).build()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void writeMessage(OfHeader ofHeader, long j) {
        this.parent.getChannel().write(makeMessageListenerWrapper(ofHeader));
    }

    protected Object makeMessageListenerWrapper(@Nonnull OfHeader ofHeader) {
        Preconditions.checkArgument(ofHeader != null);
        return this.address == null ? new MessageListenerWrapper(ofHeader, LOG_ENCODER_LISTENER) : new UdpMessageListenerWrapper(ofHeader, LOG_ENCODER_LISTENER, this.address);
    }

    protected void flush() {
        if (!this.shuttingDown) {
            LOG.trace("Dequeuing messages to channel {}", this.parent.getChannel());
            writeAndFlush();
            rescheduleFlush();
        } else if (this.currentQueue.finishShutdown()) {
            close();
            LOG.debug("Channel {} shutdown complete", this.parent.getChannel());
        } else {
            LOG.trace("Channel {} current queue not completely flushed yet", this.parent.getChannel());
            rescheduleFlush();
        }
    }

    private void scheduleFlush() {
        if (!this.flushScheduled.compareAndSet(false, true)) {
            LOG.trace("Flush task is already present on channel {}", this.parent.getChannel());
        } else {
            LOG.trace("Scheduling flush task on channel {}", this.parent.getChannel());
            this.parent.getChannel().eventLoop().execute(this.flushRunnable);
        }
    }

    private void writeAndFlush() {
        this.state = PipelineState.WRITING;
        long nanoTime = System.nanoTime();
        int writeEntries = this.currentQueue.writeEntries(this.parent.getChannel(), nanoTime);
        if (writeEntries > 0) {
            LOG.trace("Flushing channel {}", this.parent.getChannel());
            this.parent.getChannel().flush();
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Flushed {} messages to channel {} in {}us", new Object[]{Integer.valueOf(writeEntries), this.parent.getChannel(), Long.valueOf(TimeUnit.NANOSECONDS.toMicros(System.nanoTime() - nanoTime))});
        }
        this.state = PipelineState.IDLE;
    }

    private void rescheduleFlush() {
        if (!this.flushScheduled.compareAndSet(true, false)) {
            LOG.warn("Channel {} queue {} flusher found unscheduled", this.parent.getChannel(), this);
        }
        conditionalFlush();
    }

    private void conditionalFlush() {
        if (!this.currentQueue.needsFlush()) {
            LOG.trace("Queue is empty, no flush needed");
        } else if (this.shuttingDown || this.parent.getChannel().isWritable()) {
            scheduleFlush();
        } else {
            LOG.debug("Channel {} is not I/O ready, not scheduling a flush", this.parent.getChannel());
        }
    }
}
