/*
 * Decompiled with CFR 0.152.
 */
package io.reactivex.netty.examples.http.ws.messaging;

import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.reactivex.netty.examples.http.ws.messaging.MessageFrame;
import io.reactivex.netty.util.UnicastBufferingSubject;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import rx.Observable;

public class PendingMessageTracker {
    private final ConcurrentLinkedQueue<Long> unacknowledgedIds = new ConcurrentLinkedQueue();

    public PendingMessageTracker(UnicastBufferingSubject<WebSocketFrame> sender) {
        Observable.interval((long)10L, (TimeUnit)TimeUnit.SECONDS).forEach(aTick -> {
            Long unacked;
            while ((unacked = this.unacknowledgedIds.poll()) != null) {
                sender.onNext((Object)new MessageFrame(MessageFrame.MessageType.Message, unacked));
            }
        });
    }

    public MessageFrame addPendingMessage(MessageFrame messageFrame) {
        this.unacknowledgedIds.add(messageFrame.getId());
        return messageFrame;
    }

    public MessageFrame removePendingMessage(BinaryWebSocketFrame bFrame) {
        MessageFrame mf = new MessageFrame(bFrame.content());
        this.unacknowledgedIds.remove(mf.getId());
        return mf;
    }
}

