/*
 * Decompiled with CFR 0.152.
 */
package org.opendaylight.openflowjava.protocol.impl.core.connection;

import com.google.common.base.Preconditions;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import org.opendaylight.openflowjava.protocol.api.connection.OutboundQueueHandler;
import org.opendaylight.openflowjava.protocol.impl.core.connection.AbstractStackedOutboundQueue;
import org.opendaylight.openflowjava.protocol.impl.core.connection.ConnectionAdapterImpl;
import org.opendaylight.openflowjava.protocol.impl.core.connection.MessageListenerWrapper;
import org.opendaylight.openflowjava.protocol.impl.core.connection.UdpMessageListenerWrapper;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInput;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoReplyInputBuilder;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.EchoRequestMessage;
import org.opendaylight.yang.gen.v1.urn.opendaylight.openflow.protocol.rev130731.OfHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class AbstractOutboundQueueManager<T extends OutboundQueueHandler, O extends AbstractStackedOutboundQueue>
extends ChannelInboundHandlerAdapter
implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractOutboundQueueManager.class);
    private static final int DEFAULT_LOW_WATERMARK = 131072;
    private static final int DEFAULT_HIGH_WATERMARK = 262144;
    private final AtomicBoolean flushScheduled = new AtomicBoolean();
    protected final ConnectionAdapterImpl parent;
    protected final InetSocketAddress address;
    protected final O currentQueue;
    private final T handler;
    private volatile PipelineState state = PipelineState.IDLE;
    private boolean alreadyReading;
    protected boolean shuttingDown;
    protected final Runnable flushRunnable = new Runnable(){

        @Override
        public void run() {
            AbstractOutboundQueueManager.this.flush();
        }
    };
    private static final GenericFutureListener<Future<Void>> LOG_ENCODER_LISTENER = new GenericFutureListener<Future<Void>>(){
        private final Logger LOGGER = LoggerFactory.getLogger((String)"LogEncoderListener");

        public void operationComplete(Future<Void> future) throws Exception {
            if (future.cause() != null) {
                this.LOGGER.warn("Message encoding fail !", future.cause());
            }
        }
    };

    AbstractOutboundQueueManager(ConnectionAdapterImpl parent, InetSocketAddress address, T handler) {
        this.parent = (ConnectionAdapterImpl)Preconditions.checkNotNull((Object)parent);
        this.handler = (OutboundQueueHandler)Preconditions.checkNotNull(handler);
        this.address = address;
        this.currentQueue = this.initializeStackedOutboudnqueue();
        LOG.debug("Queue manager instantiated with queue {}", this.currentQueue);
        handler.onConnectionQueueChanged(this.currentQueue);
    }

    protected abstract O initializeStackedOutboudnqueue();

    @Override
    public void close() {
        this.handler.onConnectionQueueChanged(null);
    }

    public String toString() {
        return String.format("Channel %s queue [flushing=%s]", this.parent.getChannel(), this.flushScheduled.get());
    }

    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        ctx.channel().config().setWriteBufferHighWaterMark(262144);
        ctx.channel().config().setWriteBufferLowWaterMark(131072);
        super.handlerAdded(ctx);
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        this.conditionalFlush();
    }

    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        super.channelReadComplete(ctx);
        this.writeAndFlush();
        this.alreadyReading = false;
    }

    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        super.channelWritabilityChanged(ctx);
        LOG.debug("Channel {} writability changed, invoking flush", (Object)this.parent.getChannel());
        this.writeAndFlush();
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        LOG.debug("Channel {} initiating shutdown...", (Object)ctx.channel());
        this.shuttingDown = true;
        long entries = ((AbstractStackedOutboundQueue)this.currentQueue).startShutdown(ctx.channel());
        LOG.debug("Cleared {} queue entries from channel {}", (Object)entries, (Object)ctx.channel());
        this.scheduleFlush();
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (!this.alreadyReading) {
            this.alreadyReading = true;
            this.state = PipelineState.READING;
        }
        super.channelRead(ctx, msg);
    }

    boolean onMessage(OfHeader message) {
        LOG.trace("Attempting to pair message {} to a request", (Object)message);
        return ((AbstractStackedOutboundQueue)this.currentQueue).pairRequest(message);
    }

    T getHandler() {
        return this.handler;
    }

    void ensureFlushing() {
        if (!this.parent.getChannel().isWritable()) {
            return;
        }
        PipelineState localState = this.state;
        LOG.debug("Synchronize on pipeline state {}", (Object)localState);
        switch (localState) {
            case READING: {
                break;
            }
            default: {
                this.scheduleFlush();
            }
        }
    }

    void onEchoRequest(EchoRequestMessage message) {
        EchoReplyInput reply = new EchoReplyInputBuilder().setData(message.getData()).setVersion(message.getVersion()).setXid(message.getXid()).build();
        this.parent.getChannel().writeAndFlush(this.makeMessageListenerWrapper((OfHeader)reply));
    }

    void writeMessage(OfHeader message, long now) {
        Object wrapper = this.makeMessageListenerWrapper(message);
        this.parent.getChannel().write(wrapper);
    }

    protected Object makeMessageListenerWrapper(@Nonnull OfHeader msg) {
        Preconditions.checkArgument((msg != null ? 1 : 0) != 0);
        if (this.address == null) {
            return new MessageListenerWrapper(msg, LOG_ENCODER_LISTENER);
        }
        return new UdpMessageListenerWrapper(msg, LOG_ENCODER_LISTENER, this.address);
    }

    protected void flush() {
        if (!this.shuttingDown) {
            LOG.trace("Dequeuing messages to channel {}", (Object)this.parent.getChannel());
            this.writeAndFlush();
            this.rescheduleFlush();
        } else if (((AbstractStackedOutboundQueue)this.currentQueue).finishShutdown()) {
            this.close();
            LOG.debug("Channel {} shutdown complete", (Object)this.parent.getChannel());
        } else {
            LOG.trace("Channel {} current queue not completely flushed yet", (Object)this.parent.getChannel());
            this.rescheduleFlush();
        }
    }

    private void scheduleFlush() {
        if (this.flushScheduled.compareAndSet(false, true)) {
            LOG.trace("Scheduling flush task on channel {}", (Object)this.parent.getChannel());
            this.parent.getChannel().eventLoop().execute(this.flushRunnable);
        } else {
            LOG.trace("Flush task is already present on channel {}", (Object)this.parent.getChannel());
        }
    }

    private void writeAndFlush() {
        this.state = PipelineState.WRITING;
        long start = System.nanoTime();
        int entries = ((AbstractStackedOutboundQueue)this.currentQueue).writeEntries(this.parent.getChannel(), start);
        if (entries > 0) {
            LOG.trace("Flushing channel {}", (Object)this.parent.getChannel());
            this.parent.getChannel().flush();
        }
        if (LOG.isDebugEnabled()) {
            long stop = System.nanoTime();
            LOG.debug("Flushed {} messages to channel {} in {}us", new Object[]{entries, this.parent.getChannel(), TimeUnit.NANOSECONDS.toMicros(stop - start)});
        }
        this.state = PipelineState.IDLE;
    }

    private void rescheduleFlush() {
        if (!this.flushScheduled.compareAndSet(true, false)) {
            LOG.warn("Channel {} queue {} flusher found unscheduled", (Object)this.parent.getChannel(), (Object)this);
        }
        this.conditionalFlush();
    }

    private void conditionalFlush() {
        if (((AbstractStackedOutboundQueue)this.currentQueue).needsFlush()) {
            if (this.shuttingDown || this.parent.getChannel().isWritable()) {
                this.scheduleFlush();
            } else {
                LOG.debug("Channel {} is not I/O ready, not scheduling a flush", (Object)this.parent.getChannel());
            }
        } else {
            LOG.trace("Queue is empty, no flush needed");
        }
    }

    private static enum PipelineState {
        IDLE,
        READING,
        WRITING;

    }
}

