package org.davidmoten.rx2.io.internal;

import io.reactivex.Scheduler;
import io.reactivex.SingleObserver;
import io.reactivex.SingleSource;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.internal.fuseable.SimplePlainQueue;
import io.reactivex.internal.queue.SpscLinkedArrayQueue;
import io.reactivex.internal.util.BackpressureHelper;
import io.reactivex.plugins.RxJavaPlugins;
import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.davidmoten.rx2.http.Writer;
import org.davidmoten.rx2.http.WriterFactory;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/davidmoten/rx2/io/internal/Server.class */
public final class Server {

    /* loaded from: input_file:org/davidmoten/rx2/io/internal/Server$HandlerSubscriber.class */
    private static final class HandlerSubscriber extends AtomicInteger implements Subscriber<ByteBuffer>, Subscription, SingleObserver<OutputStream> {
        private static final Logger log = LoggerFactory.getLogger(HandlerSubscriber.class);
        private static final long serialVersionUID = 1331107616659478552L;
        private final SingleSource<OutputStream> outSource;
        private final Runnable completion;
        private final long id;
        private final Scheduler.Worker worker;
        private final WriterFactory writerFactory;
        private final AfterOnNext afterOnNext;
        private Subscription parent;
        private volatile boolean finished;
        private Throwable error;
        private volatile boolean cancelled;
        private Disposable disposable;
        private Writer writer;
        private long emitted;
        private final AtomicLong requested = new AtomicLong();
        boolean firstOnNext = true;
        private SimplePlainQueue<ByteBuffer> queue = new SpscLinkedArrayQueue(16);

        HandlerSubscriber(SingleSource<OutputStream> singleSource, Runnable runnable, long j, Scheduler scheduler, WriterFactory writerFactory, AfterOnNext afterOnNext) {
            this.outSource = singleSource;
            this.completion = runnable;
            this.id = j;
            this.writerFactory = writerFactory;
            this.worker = scheduler.createWorker();
            this.afterOnNext = afterOnNext;
        }

        public void onSubscribe(Subscription subscription) {
            this.parent = subscription;
            this.outSource.subscribe(this);
            log.debug("subscribed to source");
        }

        public void onSubscribe(Disposable disposable) {
            this.disposable = disposable;
        }

        public void onSuccess(OutputStream outputStream) {
            try {
                this.writer = this.writerFactory.createWriter(outputStream);
                this.writer.write(Util.toBytes(this.id));
                this.writer.flush();
            } catch (IOException e) {
                this.error = e;
                this.finished = true;
            }
            drain();
        }

        public void request(long j) {
            log.debug("server request id={}, n={}", Long.valueOf(this.id), Long.valueOf(j));
            BackpressureHelper.add(this.requested, j);
            this.worker.schedule(() -> {
                this.parent.request(j);
                drain();
            });
        }

        public void cancel() {
            this.cancelled = true;
            this.disposable.dispose();
            this.parent.cancel();
            this.worker.dispose();
        }

        public void onNext(ByteBuffer byteBuffer) {
            this.queue.offer(byteBuffer);
            drain();
        }

        public void onError(Throwable th) {
            this.error = th;
            this.finished = true;
            drain();
        }

        public void onComplete() {
            this.finished = true;
            drain();
        }

        private void drain() {
            if (getAndIncrement() == 0) {
                int i = 1;
                do {
                    long j = this.requested.get();
                    long j2 = this.emitted;
                    while (!this.cancelled) {
                        boolean z = this.finished;
                        ByteBuffer byteBuffer = (ByteBuffer) this.queue.poll();
                        if (byteBuffer != null) {
                            try {
                                j2++;
                                writeOnNext(byteBuffer, j2 == j);
                            } catch (Throwable th) {
                                this.parent.cancel();
                                this.queue.clear();
                                this.worker.dispose();
                                if (!this.cancelled) {
                                    writeError(th);
                                }
                                this.completion.run();
                                return;
                            }
                        } else {
                            if (z) {
                                Throwable th2 = this.error;
                                if (th2 == null) {
                                    doOnComplete();
                                    this.completion.run();
                                    return;
                                }
                                this.error = null;
                                this.parent.cancel();
                                this.queue.clear();
                                this.worker.dispose();
                                if (!this.cancelled) {
                                    writeError(th2);
                                }
                                this.completion.run();
                                return;
                            }
                            this.emitted = j2;
                            i = addAndGet(-i);
                        }
                    }
                    this.parent.cancel();
                    this.queue.clear();
                    this.error = null;
                    this.worker.dispose();
                    this.completion.run();
                    return;
                } while (i != 0);
            }
        }

        private void doOnComplete() {
            log.debug("server: onComplete");
            try {
                Server.writeInt(this.writer, Integer.MIN_VALUE);
                this.writer.flush();
            } catch (IOException e) {
                RxJavaPlugins.onError(e);
            }
        }

        private void writeError(Throwable th) {
            log.debug("server: onError", th);
            try {
                NoCopyByteArrayOutputStream noCopyByteArrayOutputStream = new NoCopyByteArrayOutputStream(4096);
                th.printStackTrace(new PrintStream((OutputStream) noCopyByteArrayOutputStream, true, "UTF-8"));
                noCopyByteArrayOutputStream.close();
                Server.writeInt(this.writer, -noCopyByteArrayOutputStream.size());
                this.writer.write(noCopyByteArrayOutputStream.buffer(), 0, noCopyByteArrayOutputStream.size());
                this.writer.flush();
            } catch (IOException e) {
                if (e instanceof EOFException) {
                    return;
                }
                RxJavaPlugins.onError(e);
            }
        }

        private void writeOnNext(ByteBuffer byteBuffer, boolean z) throws IOException {
            if (this.firstOnNext) {
                log.debug("server: first onNext");
                this.firstOnNext = false;
            }
            Server.writeInt(this.writer, byteBuffer.remaining());
            this.writer.write(byteBuffer);
            if (z || this.afterOnNext.flushRequested(byteBuffer.remaining())) {
                this.writer.flush();
            }
        }
    }

    private Server() {
    }

    public static void handle(Publisher<? extends ByteBuffer> publisher, SingleSource<OutputStream> singleSource, Runnable runnable, long j, Scheduler scheduler, Consumer<Subscription> consumer, WriterFactory writerFactory, AfterOnNextFactory afterOnNextFactory) {
        HandlerSubscriber handlerSubscriber = new HandlerSubscriber(singleSource, runnable, j, scheduler, writerFactory, afterOnNextFactory.create());
        try {
            consumer.accept(handlerSubscriber);
            publisher.subscribe(handlerSubscriber);
        } catch (Exception e) {
            throw new RuntimeException("subscription consumer threw", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void writeInt(Writer writer, int i) throws IOException {
        writer.write((i >>> 24) & 255);
        writer.write((i >>> 16) & 255);
        writer.write((i >>> 8) & 255);
        writer.write((i >>> 0) & 255);
    }
}
