package io.reactivesocket.tckdrivers.common;

import io.reactivesocket.Payload;
import io.reactivesocket.util.PayloadImpl;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/* loaded from: input_file:io/reactivesocket/tckdrivers/common/EchoSubscription.class */
public class EchoSubscription implements Subscription {
    private Subscriber<? super Payload> sub;
    private long numSent = 0;
    private long numRequested = 0;
    private boolean cancelled = false;
    private Queue<Tuple<String, String>> q = new ConcurrentLinkedQueue();

    public EchoSubscription(Subscriber<? super Payload> subscriber) {
        this.sub = subscriber;
    }

    public void add(Tuple<String, String> tuple) {
        this.q.add(tuple);
        if (this.numSent < this.numRequested) {
            request(0L);
        }
    }

    @Override // org.reactivestreams.Subscription
    public synchronized void request(long j) {
        this.numRequested += j;
        while (this.numSent < this.numRequested && !this.q.isEmpty() && !this.cancelled) {
            Tuple<String, String> poll = this.q.poll();
            System.out.println("Sending ... " + poll);
            this.sub.onNext(new PayloadImpl(poll.getK(), poll.getV()));
            this.numSent++;
        }
    }

    @Override // org.reactivestreams.Subscription
    public void cancel() {
        this.cancelled = true;
    }
}
