/*
 * Decompiled with CFR 0.152.
 */
package org.kaazing.k3po.driver.internal.netty.bootstrap.bbosh;

import java.util.Deque;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.kaazing.k3po.driver.internal.channel.Channels;
import org.kaazing.k3po.driver.internal.netty.bootstrap.channel.AbstractChannelSink;
import org.kaazing.k3po.driver.internal.netty.bootstrap.http.HttpChannelConfig;
import org.kaazing.k3po.driver.internal.netty.bootstrap.http.HttpChildChannel;

public class BBoshPollingChildChannelSink
extends AbstractChannelSink {
    private final Deque<MessageEvent> messages;
    private AtomicInteger nextSequenceNo;
    private AtomicReference<HttpChildChannel> httpChannelRef;
    private final Runnable flushTask = new Runnable(){

        @Override
        public void run() {
            HttpChildChannel httpChannel = (HttpChildChannel)((Object)BBoshPollingChildChannelSink.this.httpChannelRef.get());
            if (!BBoshPollingChildChannelSink.this.messages.isEmpty()) {
                while (!BBoshPollingChildChannelSink.this.messages.isEmpty()) {
                    MessageEvent head = (MessageEvent)BBoshPollingChildChannelSink.this.messages.removeFirst();
                    ChannelBuffer message = (ChannelBuffer)head.getMessage();
                    int readableBytes = message.readableBytes();
                    ChannelFuture future = head.getFuture();
                    ChannelFuture httpFuture = httpChannel.write(message);
                    Channels.chainWriteCompletes(httpFuture, future, readableBytes);
                }
                BBoshPollingChildChannelSink.this.httpChannelRef.compareAndSet(httpChannel, null);
                httpChannel.close();
            }
        }
    };

    public BBoshPollingChildChannelSink(int nextSequenceNo) {
        this.nextSequenceNo = new AtomicInteger(nextSequenceNo);
        this.httpChannelRef = new AtomicReference();
        this.messages = new ConcurrentLinkedDeque<MessageEvent>();
    }

    @Override
    protected void writeRequested(ChannelPipeline pipeline, MessageEvent e) throws Exception {
        this.messages.addLast(e);
        HttpChildChannel httpChannel = this.httpChannelRef.get();
        if (httpChannel != null) {
            ChannelPipeline httpPipeline = httpChannel.getPipeline();
            httpPipeline.execute(this.flushTask);
        }
    }

    public ChannelFuture attach(int sequenceNo, HttpChildChannel httpChannel) {
        ChannelFuture httpFuture = org.jboss.netty.channel.Channels.future((Channel)httpChannel);
        this.attach(sequenceNo, httpChannel, httpFuture);
        return httpFuture;
    }

    public void detach(HttpChildChannel httpChannel) {
        if (httpChannel.isOpen()) {
            assert (httpChannel == this.httpChannelRef.get());
            this.httpChannelRef.set(null);
            httpChannel.close();
        }
    }

    private void attach(final int sequenceNo, final HttpChildChannel httpChannel, final ChannelFuture httpFuture) {
        if (sequenceNo < this.nextSequenceNo.get()) {
            String message = String.format("Replayed sequence number: %d", sequenceNo);
            ChannelException exception = new ChannelException(message);
            exception.fillInStackTrace();
            httpFuture.setFailure((Throwable)exception);
        } else if (this.nextSequenceNo.compareAndSet(sequenceNo, sequenceNo + 1)) {
            HttpChannelConfig httpConfig = (HttpChannelConfig)httpChannel.getConfig();
            HttpHeaders httpHeaders = httpConfig.getWriteHeaders();
            httpHeaders.set("Cache-Control", (Object)"no-cache");
            httpHeaders.set("Content-Type", (Object)"application/octet-stream");
            httpConfig.setMaximumBufferedContentLength(8192);
            this.httpChannelRef.set(httpChannel);
            this.flushTask.run();
            httpFuture.setSuccess();
        } else {
            Runnable reorderTask = new Runnable(){

                @Override
                public void run() {
                    BBoshPollingChildChannelSink.this.attach(sequenceNo, httpChannel, httpFuture);
                }
            };
            ChannelPipeline httpPipeline = httpChannel.getPipeline();
            httpPipeline.execute(reorderTask);
        }
    }
}

