package io.reactivex.netty.examples.http.ws.messaging;

import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.reactivex.netty.examples.http.ws.messaging.MessageFrame;
import io.reactivex.netty.util.UnicastBufferingSubject;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.internal.operators.BufferUntilSubscriber;
import rx.internal.operators.OperatorOnBackpressureBuffer;

/* loaded from: input_file:io/reactivex/netty/examples/http/ws/messaging/MessageProducer.class */
public class MessageProducer {
    private final UnicastBufferingSubject<WebSocketFrame> sender;
    private final Observable<WebSocketFrame> messageStream;
    private final PendingMessageTracker messageTracker;

    public MessageProducer(int i, long j, TimeUnit timeUnit) {
        BufferUntilSubscriber.create().lift(new OperatorOnBackpressureBuffer(i));
        this.sender = UnicastBufferingSubject.create(i);
        this.messageTracker = new PendingMessageTracker(this.sender);
        Observable cast = this.sender.filter(webSocketFrame -> {
            return Boolean.valueOf(webSocketFrame instanceof MessageFrame);
        }).cast(MessageFrame.class);
        PendingMessageTracker pendingMessageTracker = this.messageTracker;
        pendingMessageTracker.getClass();
        this.messageStream = cast.map(pendingMessageTracker::addPendingMessage).cast(WebSocketFrame.class).concatWith(Observable.just(new CloseWebSocketFrame()));
        Observable.interval(j, timeUnit).take(i).forEach(l -> {
            this.sender.onNext(new MessageFrame(MessageFrame.MessageType.Message, l.longValue()));
        });
    }

    public Observable<WebSocketFrame> getMessageStream() {
        return this.messageStream;
    }

    public long acceptAcknowledgment(BinaryWebSocketFrame binaryWebSocketFrame) {
        MessageFrame removePendingMessage = this.messageTracker.removePendingMessage(binaryWebSocketFrame);
        removePendingMessage.release();
        return removePendingMessage.getId();
    }
}
