/*
 * Decompiled with CFR 0.152.
 */
package org.opendaylight.ocpjava.protocol.impl.core.connection;

import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetSocketAddress;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.opendaylight.ocpjava.protocol.impl.core.connection.MessageListenerWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

final class ChannelOutboundQueue
extends ChannelInboundHandlerAdapter {
    private static final long DEFAULT_WORKTIME_MICROS = TimeUnit.MILLISECONDS.toMicros(100L);
    private static final int WORKTIME_RECHECK_MSGS = 64;
    private static final Logger LOG = LoggerFactory.getLogger(ChannelOutboundQueue.class);
    private final Runnable flushRunnable = new Runnable(){

        @Override
        public void run() {
            ChannelOutboundQueue.this.flush();
        }
    };
    private static final AtomicIntegerFieldUpdater<ChannelOutboundQueue> FLUSH_SCHEDULED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ChannelOutboundQueue.class, "flushScheduled");
    private volatile int flushScheduled = 0;
    private final Queue<MessageHolder<?>> queue;
    private final long maxWorkTime;
    private final Channel channel;
    private final InetSocketAddress address;

    public ChannelOutboundQueue(Channel channel, int queueDepth, InetSocketAddress address) {
        Preconditions.checkArgument((queueDepth > 0 ? 1 : 0) != 0, (Object)"Queue depth has to be positive");
        this.queue = new LinkedBlockingQueue(queueDepth);
        this.channel = (Channel)Preconditions.checkNotNull((Object)channel);
        this.maxWorkTime = TimeUnit.MICROSECONDS.toNanos(DEFAULT_WORKTIME_MICROS);
        this.address = address;
    }

    public boolean enqueue(MessageHolder<?> holder) {
        LOG.trace("Enqueuing message {}", holder);
        if (this.queue.offer(holder)) {
            LOG.trace("Message enqueued");
            this.conditionalFlush();
            return true;
        }
        LOG.debug("Message queue is full");
        return false;
    }

    private void scheduleFlush(EventExecutor executor) {
        if (FLUSH_SCHEDULED_UPDATER.compareAndSet(this, 0, 1)) {
            LOG.trace("Scheduling flush task");
            executor.execute(this.flushRunnable);
        } else {
            LOG.trace("Flush task is already present");
        }
    }

    private void conditionalFlush() {
        if (this.queue.isEmpty()) {
            LOG.trace("Queue is empty, not flush needed");
            return;
        }
        if (!this.channel.isWritable()) {
            LOG.trace("Channel {} is not writable, not issuing a flush", (Object)this.channel);
            return;
        }
        this.scheduleFlush(this.channel.pipeline().lastContext().executor());
    }

    private synchronized void flush() {
        long start = System.nanoTime();
        long deadline = start + this.maxWorkTime;
        LOG.debug("Dequeuing messages to channel {}", (Object)this.channel);
        long messages = 0L;
        while (true) {
            if (!this.channel.isWritable()) {
                LOG.trace("Channel is no longer writable");
                break;
            }
            MessageHolder<?> h = this.queue.poll();
            if (h == null) {
                LOG.trace("The queue is completely drained");
                break;
            }
            GenericFutureListener<Future<Void>> l = h.takeListener();
            ChannelFuture p = this.channel.write((Object)new MessageListenerWrapper(h.takeMessage(), l));
            if (l != null) {
                p.addListener(l);
            }
            if (messages % 64L == 0L && System.nanoTime() >= deadline) {
                LOG.trace("Exceeded allotted work time {}us", (Object)TimeUnit.NANOSECONDS.toMicros(this.maxWorkTime));
                break;
            }
            ++messages;
        }
        if (messages > 0L) {
            LOG.debug("Flushing {} message(s) to channel {}", (Object)messages, (Object)this.channel);
            this.channel.flush();
        }
        if (LOG.isDebugEnabled()) {
            long stop = System.nanoTime();
            LOG.debug("Flushed {} messages in {}us to channel {}", new Object[]{messages, TimeUnit.NANOSECONDS.toMicros(stop - start), this.channel});
        }
        if (!FLUSH_SCHEDULED_UPDATER.compareAndSet(this, 1, 0)) {
            LOG.warn("Channel {} queue {} flusher found unscheduled", (Object)this.channel, this.queue);
        }
        this.conditionalFlush();
    }

    private void conditionalFlush(ChannelHandlerContext ctx) {
        Preconditions.checkState((boolean)ctx.channel().equals(this.channel), (String)"Inconsistent channel %s with context %s", (Object[])new Object[]{this.channel, ctx});
        this.conditionalFlush();
    }

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        this.conditionalFlush(ctx);
    }

    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        super.channelWritabilityChanged(ctx);
        this.conditionalFlush(ctx);
    }

    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        MessageHolder<?> e;
        super.channelInactive(ctx);
        long entries = 0L;
        LOG.debug("Channel shutdown, flushing queue...");
        ChannelFuture result = ctx.newFailedFuture((Throwable)new RejectedExecutionException("Channel disconnected"));
        while ((e = this.queue.poll()) != null) {
            e.takeListener().operationComplete((Future)result);
            ++entries;
        }
        LOG.debug("Flushed {} queue entries", (Object)entries);
    }

    public String toString() {
        return String.format("Channel %s queue [%s messages flushing=%s]", this.channel, this.queue.size(), this.flushScheduled);
    }

    public static interface MessageHolder<T> {
        public GenericFutureListener<Future<Void>> takeListener();

        public T takeMessage();
    }
}

