package org.davidmoten.rx2.io.internal;

import com.github.davidmoten.guavamini.annotations.VisibleForTesting;
import io.reactivex.Flowable;
import io.reactivex.Scheduler;
import io.reactivex.Single;
import io.reactivex.schedulers.Schedulers;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.davidmoten.rx2.http.Response;
import org.davidmoten.rx2.http.WriterFactory;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;

/* loaded from: input_file:org/davidmoten/rx2/io/internal/ServletHandler.class */
public final class ServletHandler {
    private final Random random = new Random();
    private final Map<Long, Subscription> map = new ConcurrentHashMap();

    public static ServletHandler create() {
        return new ServletHandler();
    }

    private ServletHandler() {
    }

    public void doGet(Callable<Response> callable, HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse) throws ServletException, IOException {
        String parameter = httpServletRequest.getParameter("id");
        if (parameter != null) {
            handleRequest(Long.parseLong(parameter), Long.parseLong(httpServletRequest.getParameter("r")));
            return;
        }
        long request = getRequest(httpServletRequest);
        httpServletResponse.setContentType("application/octet-stream");
        try {
            Response call = callable.call();
            if (!call.isAsync() || !httpServletRequest.isAsyncSupported()) {
                handleStreamBlocking(call.publisher(), httpServletResponse.getOutputStream(), call.requestScheduler(), request, call.writerFactory(), call.afterOnNextFactory());
                return;
            }
            AsyncContext startAsync = httpServletRequest.startAsync();
            startAsync.setTimeout(0L);
            handleStreamNonBlocking(call.publisher(), startAsync.getResponse().getOutputStream(), call.requestScheduler(), request, startAsync, call.writerFactory(), call.afterOnNextFactory());
        } catch (Throwable th) {
            handleStreamBlocking(Flowable.error(th), httpServletResponse.getOutputStream(), Schedulers.io(), request, WriterFactory.DEFAULT, AfterOnNextFactory.DEFAULT);
        }
    }

    private void handleStreamBlocking(Publisher<? extends ByteBuffer> publisher, OutputStream outputStream, Scheduler scheduler, long j, WriterFactory writerFactory, AfterOnNextFactory afterOnNextFactory) {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        long nextId = nextId(this.random);
        handleStream(publisher, outputStream, scheduler, j, nextId, () -> {
            this.map.remove(Long.valueOf(nextId));
            countDownLatch.countDown();
        }, writerFactory, afterOnNextFactory);
        waitFor(countDownLatch);
    }

    @VisibleForTesting
    static void waitFor(CountDownLatch countDownLatch) {
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
        }
    }

    private void handleStreamNonBlocking(Publisher<? extends ByteBuffer> publisher, OutputStream outputStream, Scheduler scheduler, long j, AsyncContext asyncContext, WriterFactory writerFactory, AfterOnNextFactory afterOnNextFactory) {
        long nextId = nextId(this.random);
        handleStream(publisher, outputStream, scheduler, j, nextId, () -> {
            this.map.remove(Long.valueOf(nextId));
            asyncContext.complete();
        }, writerFactory, afterOnNextFactory);
    }

    private void handleStream(Publisher<? extends ByteBuffer> publisher, OutputStream outputStream, Scheduler scheduler, long j, long j2, Runnable runnable, WriterFactory writerFactory, AfterOnNextFactory afterOnNextFactory) {
        Subscription subscription;
        Server.handle(publisher, Single.just(outputStream), runnable, j2, scheduler, subscription2 -> {
            this.map.put(Long.valueOf(j2), subscription2);
        }, writerFactory, afterOnNextFactory);
        if (j <= 0 || (subscription = this.map.get(Long.valueOf(j2))) == null) {
            return;
        }
        subscription.request(j);
    }

    private void handleRequest(long j, long j2) {
        Subscription subscription = this.map.get(Long.valueOf(j));
        if (subscription != null) {
            if (j2 > 0) {
                subscription.request(j2);
            } else if (j2 < 0) {
                subscription.cancel();
            }
        }
    }

    private static long getRequest(HttpServletRequest httpServletRequest) {
        String parameter = httpServletRequest.getParameter("r");
        return parameter != null ? Long.parseLong(parameter) : 0L;
    }

    public void close() {
        Iterator<Subscription> it = this.map.values().iterator();
        while (it.hasNext()) {
            it.next().cancel();
        }
        this.map.clear();
    }

    @VisibleForTesting
    static long nextId(Random random) {
        long nextLong;
        do {
            nextLong = random.nextLong();
        } while (nextLong == 0);
        return nextLong;
    }
}
