/*
 * Decompiled with CFR 0.152.
 */
package tech.ydb.yoj.repository.ydb;

import com.google.common.annotations.VisibleForTesting;
import java.time.Duration;
import java.util.Spliterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import tech.ydb.core.Status;
import tech.ydb.yoj.repository.db.exception.DeadlineExceededException;
import tech.ydb.yoj.repository.db.exception.QueryInterruptedException;
import tech.ydb.yoj.repository.ydb.YdbOperations;
import tech.ydb.yoj.repository.ydb.client.YdbValidator;

public class YdbSpliterator<V>
implements Spliterator<V> {
    private static final Logger log = LoggerFactory.getLogger(YdbSpliterator.class);
    private static final Duration DEFAULT_STREAM_WORK_TIMEOUT = Duration.ofMinutes(5L);
    private final long streamWorkDeadlineNanos;
    private final int flags;
    private final BlockingQueue<QueueValue<V>> queue = new ArrayBlockingQueue<QueueValue<V>>(1);
    private final BiConsumer<Status, Throwable> validateResponse;
    private volatile boolean closed = false;
    private boolean endData = false;

    public YdbSpliterator(String request, boolean isOrdered) {
        this(request, isOrdered, DEFAULT_STREAM_WORK_TIMEOUT);
    }

    @VisibleForTesting
    protected YdbSpliterator(String request, boolean isOrdered, Duration streamWorkTimeout) {
        this.flags = (isOrdered ? 16 : 0) | 0x100;
        this.streamWorkDeadlineNanos = System.nanoTime() + TimeUnit.NANOSECONDS.toNanos(YdbSpliterator.saturatedToNanos(streamWorkTimeout));
        this.validateResponse = (status, error) -> {
            if (error != null) {
                throw YdbOperations.convertToRepositoryException(error);
            }
            YdbValidator.validate(request, status.getCode(), status.toString());
        };
    }

    private long calculateTimeout() {
        return TimeUnit.NANOSECONDS.toNanos(this.streamWorkDeadlineNanos - System.nanoTime());
    }

    public Stream<V> createStream() {
        return (Stream)StreamSupport.stream(this, false).onClose(this::close);
    }

    public void onNext(V value) {
        if (this.closed) {
            throw ConsumerDoneException.INSTANCE;
        }
        try {
            if (!this.queue.offer(QueueValue.of(value), this.calculateTimeout(), TimeUnit.NANOSECONDS)) {
                log.warn("Supplier thread was closed because consumer didn't poll an element of stream on timeout");
                throw OfferDeadlineExceededException.INSTANCE;
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new QueryInterruptedException("Supplier thread interrupted", (Throwable)e);
        }
    }

    public void onSupplierThreadComplete(Status status, Throwable ex) {
        if ((ex = YdbSpliterator.unwrapException(ex)) instanceof OfferDeadlineExceededException || this.closed) {
            return;
        }
        QueueValue endValue = QueueValue.ofEndData(status, ex);
        if (!YdbSpliterator.offerUninterruptibly(this.queue, endValue, this.streamWorkDeadlineNanos)) {
            log.warn("Supplier thread was closed because consumer didn't poll the last element of stream on timeout");
        }
    }

    @Nullable
    private QueueValue<V> poll() {
        try {
            return this.queue.poll(this.calculateTimeout(), TimeUnit.NANOSECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new QueryInterruptedException("Consumer thread interrupted", (Throwable)e);
        }
    }

    @Override
    public boolean tryAdvance(Consumer<? super V> action) {
        if (this.closed) {
            throw new IllegalStateException("Can't use closed YdbSpliterator");
        }
        if (this.endData) {
            return false;
        }
        QueueValue<V> value = this.poll();
        if (value == null) {
            throw new DeadlineExceededException("Stream deadline exceeded on poll");
        }
        if (value.endData()) {
            this.endData = true;
            this.validateResponse.accept(value.status(), value.error());
            return false;
        }
        action.accept(value.value());
        return true;
    }

    public void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        this.queue.clear();
    }

    @Override
    public Spliterator<V> trySplit() {
        return null;
    }

    @Override
    public long estimateSize() {
        return Long.MAX_VALUE;
    }

    @Override
    public long getExactSizeIfKnown() {
        return -1L;
    }

    @Override
    public int characteristics() {
        return this.flags;
    }

    private static Throwable unwrapException(Throwable ex) {
        if (ex instanceof CompletionException || ex instanceof ExecutionException) {
            return ex.getCause();
        }
        return ex;
    }

    private static long saturatedToNanos(Duration duration) {
        try {
            return duration.toNanos();
        }
        catch (ArithmeticException ignore) {
            return duration.isNegative() ? Long.MIN_VALUE : Long.MAX_VALUE;
        }
    }

    private static <E> boolean offerUninterruptibly(BlockingQueue<E> queue, E element, long deadlineNanos) {
        boolean interrupted = false;
        while (true) {
            try {
                long timeout = TimeUnit.NANOSECONDS.toNanos(deadlineNanos - System.nanoTime());
                boolean bl = queue.offer(element, timeout, TimeUnit.NANOSECONDS);
                return bl;
            }
            catch (InterruptedException ignore) {
                interrupted = true;
                continue;
            }
            break;
        }
        finally {
            if (interrupted) {
                Thread.currentThread().interrupt();
            }
        }
    }

    @VisibleForTesting
    protected static class ConsumerDoneException
    extends RuntimeException {
        public static final ConsumerDoneException INSTANCE = new ConsumerDoneException();

        protected ConsumerDoneException() {
        }
    }

    private record QueueValue<V>(V value, Status status, Throwable error, boolean endData) {
        public static <V> QueueValue<V> of(V value) {
            return new QueueValue<V>(value, null, null, false);
        }

        public static <V> QueueValue<V> ofEndData(Status status, Throwable ex) {
            return new QueueValue<Object>(null, status, ex, true);
        }
    }

    private static class OfferDeadlineExceededException
    extends RuntimeException {
        public static final OfferDeadlineExceededException INSTANCE = new OfferDeadlineExceededException();

        private OfferDeadlineExceededException() {
        }
    }
}

