package org.eclipse.ditto.client.streaming;

import java.time.Duration;
import java.util.Collections;
import java.util.Spliterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.eclipse.ditto.model.base.common.ConditionChecker;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/eclipse/ditto/client/streaming/SpliteratorSubscriber.class */
public final class SpliteratorSubscriber<T> implements Subscriber<T>, Spliterator<T> {
    private static final Logger LOGGER = LoggerFactory.getLogger(SpliteratorSubscriber.class);
    private final BlockingQueue<Element<T>> buffer;
    private final long timeoutMillis;
    private final int capacity;
    private final int batchSize;
    private final AtomicReference<Subscription> subscription = new AtomicReference<>();
    private final AtomicInteger splits = new AtomicInteger(1);
    private final AtomicInteger quota = new AtomicInteger(0);
    private final AtomicBoolean cancelled = new AtomicBoolean(false);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/eclipse/ditto/client/streaming/SpliteratorSubscriber$Completed.class */
    public static final class Completed<T> implements Element<T> {
        private Completed() {
        }

        @Override // org.eclipse.ditto.client.streaming.SpliteratorSubscriber.Element
        public <S> S eval(Function<T, S> function, Supplier<S> supplier, Function<Throwable, S> function2) {
            return supplier.get();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/eclipse/ditto/client/streaming/SpliteratorSubscriber$Element.class */
    public interface Element<T> {
        <S> S eval(Function<T, S> function, Supplier<S> supplier, Function<Throwable, S> function2);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/eclipse/ditto/client/streaming/SpliteratorSubscriber$Failed.class */
    public static final class Failed<T> implements Element<T> {
        private final Throwable error;

        private Failed(Throwable th) {
            this.error = th;
        }

        @Override // org.eclipse.ditto.client.streaming.SpliteratorSubscriber.Element
        public <S> S eval(Function<T, S> function, Supplier<S> supplier, Function<Throwable, S> function2) {
            return function2.apply(this.error);
        }
    }

    /* loaded from: input_file:org/eclipse/ditto/client/streaming/SpliteratorSubscriber$HasElement.class */
    private static final class HasElement<T> implements Element<T> {
        private final T element;

        private HasElement(T t) {
            this.element = t;
        }

        @Override // org.eclipse.ditto.client.streaming.SpliteratorSubscriber.Element
        public <S> S eval(Function<T, S> function, Supplier<S> supplier, Function<Throwable, S> function2) {
            return function.apply(this.element);
        }
    }

    private SpliteratorSubscriber(long j, int i, int i2) {
        this.buffer = new ArrayBlockingQueue((i * 2) + 1);
        this.timeoutMillis = j;
        this.batchSize = Math.min(i2, i);
        this.capacity = i;
    }

    public static <T> SpliteratorSubscriber<T> of() {
        return new SpliteratorSubscriber<>(10000L, 2, 1);
    }

    public static <T> SpliteratorSubscriber<T> of(Duration duration, int i, int i2) {
        if (duration.isNegative()) {
            throw new IllegalArgumentException("Expect positive timeout, got: " + duration);
        }
        if (i2 <= 0) {
            throw new IllegalArgumentException("Expect positive batchSize, got: " + i2);
        }
        if (i2 > i) {
            throw new IllegalArgumentException("Expect bufferSize to be at least batchSize=" + i2 + ", got: " + i);
        }
        return new SpliteratorSubscriber<>(Math.max(1L, duration.toMillis()), i, i2);
    }

    public Stream<T> asStream() {
        return StreamSupport.stream(this, false);
    }

    public void onSubscribe(Subscription subscription) {
        Subscription subscription2;
        LOGGER.trace("onSubscribe <{}>", subscription);
        ConditionChecker.checkNotNull(subscription);
        synchronized (this.subscription) {
            subscription2 = this.subscription.get();
            if (subscription2 == null) {
                this.subscription.set(subscription);
            }
        }
        if (subscription2 == null) {
            LOGGER.trace("Initial request: <{}>", Integer.valueOf(this.capacity));
            subscription.request(this.capacity);
        } else {
            LOGGER.warn("onSubscribe() called a second time; cancelling subscription <{}>.", subscription);
            subscription.cancel();
        }
    }

    public void onNext(T t) {
        LOGGER.trace("onNext <{}>", t);
        this.buffer.add(new HasElement(ConditionChecker.checkNotNull(t)));
    }

    public void onError(Throwable th) {
        LOGGER.trace("onError", th);
        this.cancelled.set(true);
        addErrors(th);
    }

    public void onComplete() {
        LOGGER.trace("onComplete");
        this.cancelled.set(true);
        addEos();
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void cancelOnError(Consumer<? super T> consumer, T t) {
        try {
            consumer.accept(t);
        } catch (RuntimeException e) {
            this.cancelled.set(true);
            addErrors(e);
            Subscription subscription = this.subscription.get();
            if (subscription != null) {
                subscription.cancel();
            }
            throw e;
        }
    }

    private void addEos() {
        addNCopies(new Completed());
    }

    private void addErrors(Throwable th) {
        addNCopies(new Failed(th));
    }

    private void addNCopies(Element<T> element) {
        this.buffer.addAll(Collections.nCopies(this.capacity + 1, element));
    }

    private void request() {
        if (this.cancelled.get()) {
            return;
        }
        if (this.quota.getAndUpdate(i -> {
            return i >= this.batchSize ? i - this.batchSize : i;
        }) < this.batchSize) {
            LOGGER.trace("Not requesting: not enough quota.");
        } else {
            LOGGER.trace("Request <{}>", Integer.valueOf(this.batchSize));
            this.subscription.get().request(this.batchSize);
        }
    }

    @Override // java.util.Spliterator
    public boolean tryAdvance(Consumer<? super T> consumer) {
        try {
            Element<T> poll = this.buffer.poll(this.timeoutMillis, TimeUnit.MILLISECONDS);
            if (poll == null) {
                throw new IllegalStateException("timed out after " + this.timeoutMillis + " ms");
            }
            this.quota.getAndUpdate(i -> {
                return Math.min(this.capacity, i + 1);
            });
            return ((Boolean) poll.eval(obj -> {
                cancelOnError(consumer, obj);
                request();
                return true;
            }, () -> {
                this.buffer.add(poll);
                return false;
            }, th -> {
                this.buffer.add(poll);
                throw wrapAsRuntimeException(th);
            })).booleanValue();
        } catch (Exception e) {
            throw new CompletionException(e);
        }
    }

    @Override // java.util.Spliterator
    public Spliterator<T> trySplit() {
        if (this.splits.updateAndGet(i -> {
            return Math.min(this.capacity, i) + 1;
        }) <= this.capacity) {
            return this;
        }
        return null;
    }

    @Override // java.util.Spliterator
    public long estimateSize() {
        int size = this.buffer.size();
        if (size > this.capacity) {
            return 0L;
        }
        if (0 >= size || size >= this.capacity) {
            return Long.MAX_VALUE;
        }
        return size;
    }

    @Override // java.util.Spliterator
    public int characteristics() {
        return 5376;
    }

    private static RuntimeException wrapAsRuntimeException(Throwable th) {
        return th instanceof RuntimeException ? (RuntimeException) th : new CompletionException("SpliteratorSubscriber encountered " + th.getClass() + " while reading from its publisher", th);
    }
}
