package org.davidmoten.rx2.io.internal;

import com.github.davidmoten.guavamini.annotations.VisibleForTesting;
import io.reactivex.Flowable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.BiConsumer;
import io.reactivex.internal.subscriptions.SubscriptionHelper;
import io.reactivex.plugins.RxJavaPlugins;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/davidmoten/rx2/io/internal/FlowableFromInputStream.class */
public final class FlowableFromInputStream extends Flowable<ByteBuffer> {
    private static final Logger log = LoggerFactory.getLogger(FlowableFromInputStream.class);
    private final InputStream in;
    private final BiConsumer<Long, Long> requester;

    @VisibleForTesting
    /* loaded from: input_file:org/davidmoten/rx2/io/internal/FlowableFromInputStream$FromStreamSubscription.class */
    static final class FromStreamSubscription extends AtomicInteger implements Subscription {
        private static final long serialVersionUID = 5917186677331992560L;
        private final InputStream in;
        private final Subscriber<? super ByteBuffer> child;
        private final BiConsumer<Long, Long> requester;
        private byte[] buffer;
        private int bufferIndex;
        private volatile Throwable error;
        private static final Logger log = LoggerFactory.getLogger(FromStreamSubscription.class);
        private static final long ID_UNKNOWN = 0;
        private static final IdRequested HAVE_NOT_READ_ID = new IdRequested(ID_UNKNOWN, ID_UNKNOWN);
        private int length = 0;
        private final AtomicReference<IdRequested> requested = new AtomicReference<>(HAVE_NOT_READ_ID);

        FromStreamSubscription(InputStream inputStream, BiConsumer<Long, Long> biConsumer, Subscriber<? super ByteBuffer> subscriber) {
            this.in = inputStream;
            this.requester = biConsumer;
            this.child = subscriber;
        }

        void start() {
            log.debug("calling child.onSubscribe");
            this.child.onSubscribe(this);
        }

        /* JADX WARN: Code restructure failed: missing block: B:14:0x008d, code lost:
        
            r15 = move-exception;
         */
        /* JADX WARN: Code restructure failed: missing block: B:15:0x008f, code lost:
        
            io.reactivex.exceptions.Exceptions.throwIfFatal(r15);
            closeStreamSilently();
            r9.error = r15;
         */
        /* JADX WARN: Code restructure failed: missing block: B:16:?, code lost:
        
            return;
         */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        private void readId(long r10) {
            /*
                r9 = this;
                org.slf4j.Logger r0 = org.davidmoten.rx2.io.internal.FlowableFromInputStream.FromStreamSubscription.log
                java.lang.String r1 = "reading id"
                r0.debug(r1)
                r0 = r9
                java.io.InputStream r0 = r0.in     // Catch: java.lang.Throwable -> L15
                long r0 = org.davidmoten.rx2.io.internal.Util.readLong(r0)     // Catch: java.lang.Throwable -> L15
                r12 = r0
                goto L27
            L15:
                r14 = move-exception
                r0 = r14
                io.reactivex.exceptions.Exceptions.throwIfFatal(r0)
                r0 = r9
                r0.closeStreamSilently()
                r0 = r9
                r1 = r14
                r0.error = r1
                return
            L27:
                org.slf4j.Logger r0 = org.davidmoten.rx2.io.internal.FlowableFromInputStream.FromStreamSubscription.log
                java.lang.String r1 = "id={}"
                r2 = r12
                java.lang.Long r2 = java.lang.Long.valueOf(r2)
                r0.debug(r1, r2)
            L35:
                r0 = r9
                java.util.concurrent.atomic.AtomicReference<org.davidmoten.rx2.io.internal.FlowableFromInputStream$IdRequested> r0 = r0.requested
                java.lang.Object r0 = r0.get()
                org.davidmoten.rx2.io.internal.FlowableFromInputStream$IdRequested r0 = (org.davidmoten.rx2.io.internal.FlowableFromInputStream.IdRequested) r0
                r14 = r0
                r0 = r14
                if (r0 != 0) goto L4e
                r0 = r9
                r1 = r12
                r0.cancelUpstream(r1)
                goto La4
            L4e:
                r0 = r9
                java.util.concurrent.atomic.AtomicReference<org.davidmoten.rx2.io.internal.FlowableFromInputStream$IdRequested> r0 = r0.requested
                r1 = r14
                org.davidmoten.rx2.io.internal.FlowableFromInputStream$IdRequested r2 = new org.davidmoten.rx2.io.internal.FlowableFromInputStream$IdRequested
                r3 = r2
                r4 = r12
                r5 = r10
                r3.<init>(r4, r5)
                boolean r0 = r0.compareAndSet(r1, r2)
                if (r0 == 0) goto La1
                org.slf4j.Logger r0 = org.davidmoten.rx2.io.internal.FlowableFromInputStream.FromStreamSubscription.log     // Catch: java.lang.Exception -> L8d
                java.lang.String r1 = "requesting id={}, n={}"
                r2 = r14
                long r2 = r2.id     // Catch: java.lang.Exception -> L8d
                java.lang.Long r2 = java.lang.Long.valueOf(r2)     // Catch: java.lang.Exception -> L8d
                r3 = r10
                java.lang.Long r3 = java.lang.Long.valueOf(r3)     // Catch: java.lang.Exception -> L8d
                r0.debug(r1, r2, r3)     // Catch: java.lang.Exception -> L8d
                r0 = r9
                io.reactivex.functions.BiConsumer<java.lang.Long, java.lang.Long> r0 = r0.requester     // Catch: java.lang.Exception -> L8d
                r1 = r12
                java.lang.Long r1 = java.lang.Long.valueOf(r1)     // Catch: java.lang.Exception -> L8d
                r2 = r10
                java.lang.Long r2 = java.lang.Long.valueOf(r2)     // Catch: java.lang.Exception -> L8d
                r0.accept(r1, r2)     // Catch: java.lang.Exception -> L8d
                goto La4
            L8d:
                r15 = move-exception
                r0 = r15
                io.reactivex.exceptions.Exceptions.throwIfFatal(r0)
                r0 = r9
                r0.closeStreamSilently()
                r0 = r9
                r1 = r15
                r0.error = r1
                goto La4
            La1:
                goto L35
            La4:
                return
            */
            throw new UnsupportedOperationException("Method not decompiled: org.davidmoten.rx2.io.internal.FlowableFromInputStream.FromStreamSubscription.readId(long):void");
        }

        public void request(long j) {
            log.debug("request {}", Long.valueOf(j));
            if (SubscriptionHelper.validate(j)) {
                if (this.requested.compareAndSet(HAVE_NOT_READ_ID, new IdRequested(ID_UNKNOWN, ID_UNKNOWN))) {
                    readId(j);
                    drain();
                    return;
                }
                while (true) {
                    IdRequested idRequested = this.requested.get();
                    if (idRequested == null || idRequested.requested == Long.MAX_VALUE) {
                        break;
                    }
                    long j2 = idRequested.requested + j;
                    if (j2 < ID_UNKNOWN) {
                        j2 = Long.MAX_VALUE;
                    }
                    if (this.requested.compareAndSet(idRequested, new IdRequested(idRequested.id, j2))) {
                        if (idRequested.id != ID_UNKNOWN) {
                            try {
                                log.debug("requesting {} from stream {}", Long.valueOf(idRequested.requested), Long.valueOf(j));
                                this.requester.accept(Long.valueOf(idRequested.id), Long.valueOf(j));
                            } catch (Exception e) {
                                Exceptions.throwIfFatal(e);
                                this.error = e;
                            }
                        }
                    }
                }
                drain();
            }
        }

        private void drain() {
            Throwable th;
            if (getAndIncrement() == 0) {
                int i = 1;
                do {
                    IdRequested idRequested = this.requested.get();
                    long j = idRequested == null ? 0L : idRequested.requested;
                    long j2 = 0;
                    if (j == ID_UNKNOWN && (th = this.error) != null) {
                        this.error = null;
                        emitError(th);
                        return;
                    }
                    while (j2 != j) {
                        if (tryCancelled()) {
                            return;
                        }
                        Throwable th2 = this.error;
                        if (this.buffer == null) {
                            if (th2 != null) {
                                this.error = null;
                                emitError(th2);
                                return;
                            }
                            try {
                                this.length = Util.readInt(this.in);
                                if (this.length == Integer.MIN_VALUE) {
                                    closeStreamSilently();
                                    this.child.onComplete();
                                    return;
                                } else {
                                    this.buffer = new byte[Math.abs(this.length)];
                                    this.bufferIndex = 0;
                                }
                            } catch (IOException e) {
                                emitError(e);
                                return;
                            }
                        }
                        try {
                            int read = this.in.read(this.buffer, this.bufferIndex, Math.abs(this.length) - this.bufferIndex);
                            if (read == -1) {
                                emitError(new EOFException("encountered EOF before expected length was read"));
                                return;
                            }
                            this.bufferIndex += read;
                            if (this.bufferIndex == Math.abs(this.length)) {
                                if (this.length < 0) {
                                    String str = new String(this.buffer, 0, -this.length, StandardCharsets.UTF_8);
                                    this.buffer = null;
                                    this.child.onError(new RuntimeException(str));
                                    return;
                                } else {
                                    this.child.onNext(ByteBuffer.wrap(this.buffer, 0, this.length));
                                    this.buffer = null;
                                    j2++;
                                }
                            }
                        } catch (Throwable th3) {
                            emitError(th3);
                            return;
                        }
                    }
                    if (j2 != ID_UNKNOWN) {
                        FlowableFromInputStream.produced(this.requested, j2);
                    }
                    i = addAndGet(-i);
                } while (i != 0);
            }
        }

        private void emitError(Throwable th) {
            if (cancelled()) {
                return;
            }
            closeStreamSilently();
            System.out.println("emitting error " + th.getMessage());
            this.child.onError(th);
        }

        private boolean tryCancelled() {
            return this.requested.get() == null;
        }

        private void closeStreamSilently() {
            Util.close(this.in);
        }

        private boolean cancelled() {
            return this.requested.get() == null;
        }

        public void cancel() {
            IdRequested idRequested;
            do {
                idRequested = this.requested.get();
                if (idRequested == null) {
                    return;
                }
            } while (!this.requested.compareAndSet(idRequested, null));
            if (idRequested.id != ID_UNKNOWN) {
                cancelUpstream(idRequested.id);
            }
        }

        private void cancelUpstream(long j) {
            log.debug("cancelUpstream");
            try {
                try {
                    this.requester.accept(Long.valueOf(j), -1L);
                    closeStreamSilently();
                } catch (Exception e) {
                    Exceptions.throwIfFatal(e);
                    RxJavaPlugins.onError(e);
                    closeStreamSilently();
                }
            } catch (Throwable th) {
                closeStreamSilently();
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/davidmoten/rx2/io/internal/FlowableFromInputStream$IdRequested.class */
    public static final class IdRequested {
        final long id;
        final long requested;

        IdRequested(long j, long j2) {
            this.id = j;
            this.requested = j2;
        }
    }

    public FlowableFromInputStream(InputStream inputStream, BiConsumer<Long, Long> biConsumer) {
        this.in = inputStream;
        this.requester = biConsumer;
    }

    protected void subscribeActual(Subscriber<? super ByteBuffer> subscriber) {
        log.debug("subscribeActual");
        new FromStreamSubscription(this.in, this.requester, subscriber).start();
    }

    @VisibleForTesting
    static void produced(AtomicReference<IdRequested> atomicReference, long j) {
        IdRequested idRequested;
        long j2;
        do {
            idRequested = atomicReference.get();
            if (idRequested == null) {
                return;
            }
            j2 = idRequested.requested - j;
            if (j2 < 0) {
                RxJavaPlugins.onError(new IllegalStateException("More produced than requested: " + j2));
                j2 = 0;
            }
        } while (!atomicReference.compareAndSet(idRequested, new IdRequested(idRequested.id, j2)));
    }
}
