/*
 * Decompiled with CFR 0.152.
 */
package org.opendaylight.ocpjava.protocol.impl.core.connection;

import com.google.common.base.Preconditions;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opendaylight.ocpjava.protocol.api.connection.OutboundQueue;
import org.opendaylight.ocpjava.protocol.api.connection.OutboundQueueHandler;
import org.opendaylight.ocpjava.protocol.impl.core.connection.ConnectionAdapterImpl;
import org.opendaylight.ocpjava.protocol.impl.core.connection.MessageListenerWrapper;
import org.opendaylight.ocpjava.protocol.impl.core.connection.StackedOutboundQueue;
import org.opendaylight.yang.gen.v1.urn.opendaylight.ocp.common.types.rev150811.OcpHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class OutboundQueueManager<T extends OutboundQueueHandler>
extends ChannelInboundHandlerAdapter
implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(OutboundQueueManager.class);
    private static final int DEFAULT_LOW_WATERMARK = 131072;
    private static final int DEFAULT_HIGH_WATERMARK = 262144;
    private final AtomicBoolean flushScheduled = new AtomicBoolean();
    private final StackedOutboundQueue currentQueue;
    private final ConnectionAdapterImpl parent;
    private final InetSocketAddress address;
    private final int maxNonBarrierMessages;
    private final T handler;
    private volatile PipelineState state = PipelineState.IDLE;
    private boolean alreadyReading;
    private boolean shuttingDown;
    private final Runnable flushRunnable = new Runnable(){

        @Override
        public void run() {
            OutboundQueueManager.this.flush();
        }
    };

    OutboundQueueManager(ConnectionAdapterImpl parent, InetSocketAddress address, T handler, int maxNonBarrierMessages) {
        this.parent = (ConnectionAdapterImpl)Preconditions.checkNotNull((Object)parent);
        this.handler = (OutboundQueueHandler)Preconditions.checkNotNull(handler);
        Preconditions.checkArgument((maxNonBarrierMessages > 0 ? 1 : 0) != 0);
        this.maxNonBarrierMessages = maxNonBarrierMessages;
        this.address = address;
        this.currentQueue = new StackedOutboundQueue(this);
        LOG.debug("Queue manager instantiated with queue {}", (Object)this.currentQueue);
        handler.onConnectionQueueChanged((OutboundQueue)this.currentQueue);
    }

    T getHandler() {
        return this.handler;
    }

    @Override
    public void close() {
        this.handler.onConnectionQueueChanged(null);
    }

    boolean onMessage(OcpHeader message) {
        LOG.trace("Attempting to pair message {} to a request", (Object)message);
        return this.currentQueue.pairRequest(message);
    }

    private void scheduleFlush() {
        if (this.flushScheduled.compareAndSet(false, true)) {
            LOG.trace("Scheduling flush task on channel {}", (Object)this.parent.getChannel());
            this.parent.getChannel().eventLoop().execute(this.flushRunnable);
        } else {
            LOG.trace("Flush task is already present on channel {}", (Object)this.parent.getChannel());
        }
    }

    private void rescheduleFlush() {
        if (!this.flushScheduled.compareAndSet(true, false)) {
            LOG.warn("Channel {} queue {} flusher found unscheduled", (Object)this.parent.getChannel(), (Object)this);
        }
        this.conditionalFlush();
    }

    private void writeAndFlush() {
        this.state = PipelineState.WRITING;
        long start = System.nanoTime();
        int entries = this.currentQueue.writeEntries(this.parent.getChannel(), start);
        if (entries > 0) {
            LOG.trace("Flushing channel {}", (Object)this.parent.getChannel());
            this.parent.getChannel().flush();
        }
        if (LOG.isDebugEnabled()) {
            long stop = System.nanoTime();
            LOG.debug("Flushed {} messages to channel {} in {}us", new Object[]{entries, this.parent.getChannel(), TimeUnit.NANOSECONDS.toMicros(stop - start)});
        }
        this.state = PipelineState.IDLE;
    }

    protected void flush() {
        if (!this.shuttingDown) {
            LOG.trace("Dequeuing messages to channel {}", (Object)this.parent.getChannel());
            this.writeAndFlush();
            this.rescheduleFlush();
        } else if (this.currentQueue.finishShutdown()) {
            this.handler.onConnectionQueueChanged(null);
            LOG.debug("Channel {} shutdown complete", (Object)this.parent.getChannel());
        } else {
            LOG.trace("Channel {} current queue not completely flushed yet", (Object)this.parent.getChannel());
            this.rescheduleFlush();
        }
    }

    private void conditionalFlush() {
        if (this.currentQueue.needsFlush()) {
            if (this.shuttingDown || this.parent.getChannel().isWritable()) {
                this.scheduleFlush();
            } else {
                LOG.debug("Channel {} is not I/O ready, not scheduling a flush", (Object)this.parent.getChannel());
            }
        } else {
            LOG.trace("Queue is empty, no flush needed");
        }
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        this.conditionalFlush();
    }

    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        ctx.channel().config().setWriteBufferHighWaterMark(262144);
        ctx.channel().config().setWriteBufferLowWaterMark(131072);
        super.handlerAdded(ctx);
    }

    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        super.channelWritabilityChanged(ctx);
        LOG.debug("Channel {} writability changed, invoking flush", (Object)this.parent.getChannel());
        this.writeAndFlush();
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        LOG.debug("Channel {} initiating shutdown...", (Object)ctx.channel());
        this.shuttingDown = true;
        long entries = this.currentQueue.startShutdown(ctx.channel());
        LOG.debug("Cleared {} queue entries from channel {}", (Object)entries, (Object)ctx.channel());
        this.scheduleFlush();
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (!this.alreadyReading) {
            this.alreadyReading = true;
            this.state = PipelineState.READING;
        }
        super.channelRead(ctx, msg);
    }

    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        super.channelReadComplete(ctx);
        this.writeAndFlush();
    }

    public String toString() {
        return String.format("Channel %s queue [flushing=%s]", this.parent.getChannel(), this.flushScheduled.get());
    }

    void ensureFlushing() {
        if (!this.parent.getChannel().isWritable()) {
            return;
        }
        PipelineState localState = this.state;
        LOG.debug("Synchronize on pipeline state {}", (Object)localState);
        switch (localState) {
            case READING: {
                break;
            }
            default: {
                this.scheduleFlush();
            }
        }
    }

    void writeMessage(OcpHeader message, long now) {
        MessageListenerWrapper wrapper = new MessageListenerWrapper(message, null);
        this.parent.getChannel().write((Object)wrapper);
    }

    private static enum PipelineState {
        IDLE,
        READING,
        WRITING;

    }
}

