package com.twitter.finagle.stream;

import com.twitter.concurrent.Broker;
import com.twitter.concurrent.Offer;
import com.twitter.util.Promise;
import com.twitter.util.Return;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import scala.Predef$;
import scala.ScalaObject;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;

/* compiled from: DuplexStreamCodec.scala */
@ScalaSignature(bytes = "\u0006\u0001i3a!\u0001\u0002\u0002\u0002\tQ!\u0001\u0006\"vM\u001a,'\u000fV8DQ\u0006tg.\u001a7D_\u0012,7M\u0003\u0002\u0004\t\u000511\u000f\u001e:fC6T!!\u0002\u0004\u0002\u000f\u0019Lg.Y4mK*\u0011q\u0001C\u0001\bi^LG\u000f^3s\u0015\u0005I\u0011aA2p[N\u0019\u0001aC\f\u0011\u00051)R\"A\u0007\u000b\u00059y\u0011aB2iC:tW\r\u001c\u0006\u0003!E\tQA\\3uifT!AE\n\u0002\u000b)\u0014wn]:\u000b\u0003Q\t1a\u001c:h\u0013\t1RB\u0001\u000bTS6\u0004H.Z\"iC:tW\r\u001c%b]\u0012dWM\u001d\t\u00031mi\u0011!\u0007\u0006\u00025\u0005)1oY1mC&\u0011A$\u0007\u0002\f'\u000e\fG.Y(cU\u0016\u001cG\u000fC\u0003\u001f\u0001\u0011\u0005\u0001%\u0001\u0004=S:LGOP\u0002\u0001)\u0005\t\u0003C\u0001\u0012\u0001\u001b\u0005\u0011\u0001B\u0002\u0013\u0001A\u0003%Q%A\u0004j]\n|WO\u001c3\u0011\u0007\u0019J3&D\u0001(\u0015\tAc!\u0001\u0006d_:\u001cWO\u001d:f]RL!AK\u0014\u0003\r\t\u0013xn[3s!\tas&D\u0001.\u0015\tqs\"\u0001\u0004ck\u001a4WM]\u0005\u0003a5\u0012Qb\u00115b]:,GNQ;gM\u0016\u0014\bB\u0002\u001a\u0001A\u0003%1'A\u0004p]\u000ecwn]3\u0011\u0007Q:\u0014(D\u00016\u0015\t1d!\u0001\u0003vi&d\u0017B\u0001\u001d6\u0005\u001d\u0001&o\\7jg\u0016\u0004\"\u0001\u0007\u001e\n\u0005mJ\"\u0001B+oSRDQ!\u0010\u0001\u0005By\nQb\u00195b]:,Gn\u00117pg\u0016$GcA\u001d@\t\")\u0001\t\u0010a\u0001\u0003\u0006\u00191\r\u001e=\u0011\u00051\u0011\u0015BA\"\u000e\u0005U\u0019\u0005.\u00198oK2D\u0015M\u001c3mKJ\u001cuN\u001c;fqRDQ!\u0012\u001fA\u0002\u0019\u000b\u0011!\u001a\t\u0003\u0019\u001dK!\u0001S\u0007\u0003#\rC\u0017M\u001c8fYN#\u0018\r^3Fm\u0016tG\u000fC\u0003K\u0001\u0011\u00053*A\bnKN\u001c\u0018mZ3SK\u000e,\u0017N^3e)\rID*\u0014\u0005\u0006\u0001&\u0003\r!\u0011\u0005\u0006\u000b&\u0003\rA\u0014\t\u0003\u0019=K!\u0001U\u0007\u0003\u00195+7o]1hK\u00163XM\u001c;\t\u000bI\u0003A\u0011I*\u0002\u001d]\u0014\u0018\u000e^3SKF,Xm\u001d;fIR\u0019\u0011\bV+\t\u000b\u0001\u000b\u0006\u0019A!\t\u000b\u0015\u000b\u0006\u0019\u0001(\t\r]\u0003\u0001\u0015\"\u0005Y\u0003I\u0019XM\u001c3IC:$G.Z+qgR\u0014X-Y7\u0015\u0005eJ\u0006\"\u0002!W\u0001\u0004\t\u0005")
/* loaded from: input_file:com/twitter/finagle/stream/BufferToChannelCodec.class */
public abstract class BufferToChannelCodec extends SimpleChannelHandler implements ScalaObject {
    private final Broker<ChannelBuffer> inbound = new Broker<>();
    private final Promise<BoxedUnit> onClose = new Promise<>();

    public void channelClosed(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) {
        this.onClose.update(new Return(BoxedUnit.UNIT));
    }

    public void messageReceived(ChannelHandlerContext channelHandlerContext, MessageEvent messageEvent) {
        Object message = messageEvent.getMessage();
        if (!(message instanceof ChannelBuffer)) {
            throw new IllegalArgumentException(Predef$.MODULE$.augmentString("Unexpected message type sent upstream: %s").format(Predef$.MODULE$.genericWrapArray(new Object[]{message.getClass().toString()})));
        }
        this.inbound.$bang((ChannelBuffer) message);
    }

    public void writeRequested(ChannelHandlerContext channelHandlerContext, MessageEvent messageEvent) {
        Object message = messageEvent.getMessage();
        if (!(message instanceof Offer)) {
            throw new IllegalArgumentException(Predef$.MODULE$.augmentString("Unexpected message type sent downstream: %s").format(Predef$.MODULE$.genericWrapArray(new Object[]{message.getClass().toString()})));
        }
        messageEvent.getFuture().setSuccess();
        ((Offer) message).foreach(new BufferToChannelCodec$$anonfun$writeRequested$1(this, channelHandlerContext));
    }

    public void sendHandleUpstream(ChannelHandlerContext channelHandlerContext) {
        Channels.fireMessageReceived(channelHandlerContext, new DuplexStreamHandle(this.inbound.recv(), this.onClose, new BufferToChannelCodec$$anonfun$1(this, channelHandlerContext)));
    }

    public final void close$2(ChannelHandlerContext channelHandlerContext) {
        Channels.close(channelHandlerContext.getChannel());
    }
}
