/*
 * Decompiled with CFR 0.152.
 */
package io.reactivex.netty.examples.http.ws.messaging;

import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.reactivex.netty.examples.http.ws.messaging.MessageFrame;
import io.reactivex.netty.examples.http.ws.messaging.PendingMessageTracker;
import io.reactivex.netty.util.UnicastBufferingSubject;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.internal.operators.BufferUntilSubscriber;
import rx.internal.operators.OperatorOnBackpressureBuffer;

public class MessageProducer {
    private final UnicastBufferingSubject<WebSocketFrame> sender;
    private final Observable<WebSocketFrame> messageStream;
    private final PendingMessageTracker messageTracker;

    public MessageProducer(int messagesToSend, long interval, TimeUnit intervalDuration) {
        BufferUntilSubscriber source = BufferUntilSubscriber.create();
        source.lift((Observable.Operator)new OperatorOnBackpressureBuffer((long)messagesToSend));
        this.sender = UnicastBufferingSubject.create((long)messagesToSend);
        this.messageTracker = new PendingMessageTracker(this.sender);
        this.messageStream = this.sender.filter(f -> f instanceof MessageFrame).cast(MessageFrame.class).map(this.messageTracker::addPendingMessage).cast(WebSocketFrame.class).concatWith(Observable.just((Object)new CloseWebSocketFrame()));
        Observable.interval((long)interval, (TimeUnit)intervalDuration).take(messagesToSend).forEach(aTick -> this.sender.onNext((Object)new MessageFrame(MessageFrame.MessageType.Message, (long)aTick)));
    }

    public Observable<WebSocketFrame> getMessageStream() {
        return this.messageStream;
    }

    public long acceptAcknowledgment(BinaryWebSocketFrame ackMessage) {
        MessageFrame frame = this.messageTracker.removePendingMessage(ackMessage);
        frame.release();
        return frame.getId();
    }
}

