package io.evitadb.core.async;

import io.evitadb.api.configuration.ThreadPoolOptions;
import io.evitadb.core.metric.event.system.BackgroundTaskTimedOutEvent;
import io.evitadb.exception.EvitaInvalidUsageException;
import java.lang.Thread;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/evitadb/core/async/ObservableThreadExecutor.class */
public class ObservableThreadExecutor implements ObservableExecutorServiceWithHardDeadline {
    private static final Logger log = LoggerFactory.getLogger(ObservableThreadExecutor.class);
    private static final int BUFFER_CAPACITY = 512;
    private final String name;
    private final ForkJoinPool forkJoinPool;
    private final EvitaRejectingExecutorHandler rejectedExecutionHandler;
    private final long timeoutInMilliseconds;
    private final ArrayBlockingQueue<WeakReference<ObservableTask>> queue;
    private final ArrayList<WeakReference<ObservableTask>> buffer = new ArrayList<>(BUFFER_CAPACITY);
    private final ReentrantLock bufferLock = new ReentrantLock();
    private final AtomicLong submittedTaskCount = new AtomicLong();
    private final AtomicLong rejectedTaskCount = new AtomicLong();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/evitadb/core/async/ObservableThreadExecutor$EvitaWorkerThread.class */
    public static class EvitaWorkerThread extends ForkJoinWorkerThread {
        private static final AtomicInteger THREAD_COUNTER = new AtomicInteger();

        protected EvitaWorkerThread(@Nonnull ForkJoinPool forkJoinPool, @Nonnull String str, int i) {
            super(forkJoinPool);
            setDaemon(true);
            setName("Evita-" + str + "-" + THREAD_COUNTER.incrementAndGet());
            setPriority(i);
            setUncaughtExceptionHandler(LoggingUncaughtExceptionHandler.INSTANCE);
        }
    }

    /* loaded from: input_file:io/evitadb/core/async/ObservableThreadExecutor$LoggingUncaughtExceptionHandler.class */
    private static class LoggingUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
        public static final LoggingUncaughtExceptionHandler INSTANCE = new LoggingUncaughtExceptionHandler();

        private LoggingUncaughtExceptionHandler() {
        }

        @Override // java.lang.Thread.UncaughtExceptionHandler
        public void uncaughtException(Thread thread, Throwable th) {
            ObservableThreadExecutor.log.error("Uncaught exception in thread {}", thread.getName(), th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/evitadb/core/async/ObservableThreadExecutor$ObservableCallable.class */
    public static class ObservableCallable<V> implements Callable<V>, ObservableTask {
        private final String name;
        private final Callable<V> delegate;
        private final long timedOutAt;
        private final CompletableFuture<V> future = new CompletableFuture<>();

        public ObservableCallable(@Nonnull Callable<V> callable, long j) {
            StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
            this.name = stackTrace.length > 1 ? stackTrace[1].toString() : "Unknown";
            this.delegate = callable;
            this.timedOutAt = System.currentTimeMillis() + j;
        }

        public ObservableCallable(@Nonnull String str, @Nonnull Callable<V> callable, long j) {
            this.name = str;
            this.delegate = callable;
            this.timedOutAt = System.currentTimeMillis() + j;
        }

        @Override // io.evitadb.core.async.ObservableThreadExecutor.ObservableTask
        public boolean isFinished() {
            return this.future.isDone();
        }

        @Override // io.evitadb.core.async.ObservableThreadExecutor.ObservableTask
        public boolean isTimedOut(long j) {
            return this.timedOutAt < j;
        }

        @Override // io.evitadb.core.async.ObservableThreadExecutor.ObservableTask
        public void cancel() {
            this.future.cancel(true);
        }

        @Override // java.util.concurrent.Callable
        public V call() throws Exception {
            try {
                V call = this.delegate.call();
                this.future.complete(call);
                return call;
            } catch (Throwable th) {
                this.future.completeExceptionally(th);
                ObservableThreadExecutor.log.error("Uncaught exception in task.", th);
                throw th;
            }
        }

        public String toString() {
            return this.name;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/evitadb/core/async/ObservableThreadExecutor$ObservableRunnable.class */
    public static class ObservableRunnable implements Runnable, ObservableTask {
        private final String name;
        private final Runnable delegate;
        private final long timedOutAt;
        private final CompletableFuture<Void> future = new CompletableFuture<>();

        public ObservableRunnable(@Nonnull Runnable runnable, long j) {
            StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
            this.name = stackTrace.length > 1 ? stackTrace[1].toString() : "Unknown";
            this.delegate = runnable;
            this.timedOutAt = System.currentTimeMillis() + j;
        }

        public ObservableRunnable(@Nonnull String str, @Nonnull Runnable runnable, long j) {
            this.name = str;
            this.delegate = runnable;
            this.timedOutAt = System.currentTimeMillis() + j;
        }

        @Override // io.evitadb.core.async.ObservableThreadExecutor.ObservableTask
        public boolean isFinished() {
            return this.future.isDone();
        }

        @Override // io.evitadb.core.async.ObservableThreadExecutor.ObservableTask
        public boolean isTimedOut(long j) {
            return this.timedOutAt < j;
        }

        @Override // io.evitadb.core.async.ObservableThreadExecutor.ObservableTask
        public void cancel() {
            this.future.cancel(true);
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                this.delegate.run();
                this.future.complete(null);
            } catch (Throwable th) {
                this.future.completeExceptionally(th);
                ObservableThreadExecutor.log.error("Uncaught exception in task.", th);
                throw th;
            }
        }

        public String toString() {
            return this.name;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/evitadb/core/async/ObservableThreadExecutor$ObservableTask.class */
    public interface ObservableTask {
        boolean isFinished();

        boolean isTimedOut(long j);

        void cancel();
    }

    public ObservableThreadExecutor(@Nonnull String str, @Nonnull ThreadPoolOptions threadPoolOptions, @Nonnull Scheduler scheduler, long j) {
        this.name = str;
        this.forkJoinPool = new ForkJoinPool(Math.min(threadPoolOptions.minThreadCount(), Runtime.getRuntime().availableProcessors()), forkJoinPool -> {
            return new EvitaWorkerThread(forkJoinPool, str, threadPoolOptions.threadPriority());
        }, LoggingUncaughtExceptionHandler.INSTANCE, true, threadPoolOptions.minThreadCount(), threadPoolOptions.maxThreadCount(), 1, null, 60L, TimeUnit.SECONDS);
        AtomicLong atomicLong = this.rejectedTaskCount;
        Objects.requireNonNull(atomicLong);
        this.rejectedExecutionHandler = new EvitaRejectingExecutorHandler(str, atomicLong::incrementAndGet);
        this.timeoutInMilliseconds = j;
        this.queue = new ArrayBlockingQueue<>(threadPoolOptions.queueSize() << 1);
        if (j > -1 && j < 100) {
            throw new EvitaInvalidUsageException("The timeout must be at least 100 milliseconds.");
        }
        if (j > 0) {
            scheduler.schedule(this::cancelTimedOutTasks, j, TimeUnit.MILLISECONDS);
        }
    }

    @Nonnull
    public ForkJoinPool getForkJoinPoolInternal() {
        return this.forkJoinPool;
    }

    @Override // io.evitadb.core.async.ObservableExecutorServiceWithHardDeadline
    public long getDefaultTimeoutInMilliseconds() {
        return this.timeoutInMilliseconds;
    }

    @Override // io.evitadb.core.async.ObservableExecutorService
    public long getSubmittedTaskCount() {
        return this.submittedTaskCount.get();
    }

    @Override // io.evitadb.core.async.ObservableExecutorService
    public long getRejectedTaskCount() {
        return this.rejectedTaskCount.get();
    }

    @Override // java.util.concurrent.Executor
    public void execute(@Nonnull Runnable runnable) {
        try {
            this.forkJoinPool.execute(registerTask(runnable, this.timeoutInMilliseconds));
            this.submittedTaskCount.incrementAndGet();
        } catch (RejectedExecutionException e) {
            this.rejectedExecutionHandler.rejectedExecution();
        }
    }

    @Override // java.util.concurrent.ExecutorService
    @Nonnull
    public <T> ForkJoinTask<T> submit(@Nonnull Callable<T> callable) {
        try {
            ForkJoinTask<T> submit = this.forkJoinPool.submit((Callable) registerTask(callable, this.timeoutInMilliseconds));
            this.submittedTaskCount.incrementAndGet();
            return submit;
        } catch (RejectedExecutionException e) {
            this.rejectedExecutionHandler.rejectedExecution();
            throw e;
        }
    }

    @Override // java.util.concurrent.ExecutorService
    @Nonnull
    public <T> ForkJoinTask<T> submit(@Nonnull Runnable runnable, T t) {
        try {
            ForkJoinTask<T> submit = this.forkJoinPool.submit(registerTask(runnable, this.timeoutInMilliseconds), (Runnable) t);
            this.submittedTaskCount.incrementAndGet();
            return submit;
        } catch (RejectedExecutionException e) {
            this.rejectedExecutionHandler.rejectedExecution();
            throw e;
        }
    }

    @Override // java.util.concurrent.ExecutorService
    @Nonnull
    public ForkJoinTask<?> submit(@Nonnull Runnable runnable) {
        try {
            ForkJoinTask<?> submit = this.forkJoinPool.submit(registerTask(runnable, this.timeoutInMilliseconds));
            this.submittedTaskCount.incrementAndGet();
            return submit;
        } catch (RejectedExecutionException e) {
            this.rejectedExecutionHandler.rejectedExecution();
            throw e;
        }
    }

    @Override // java.util.concurrent.ExecutorService
    @Nonnull
    public <T> List<Future<T>> invokeAll(@Nonnull Collection<? extends Callable<T>> collection) {
        try {
            List<Future<T>> invokeAll = this.forkJoinPool.invokeAll(collection.stream().map(callable -> {
                return registerTask(callable, this.timeoutInMilliseconds);
            }).toList());
            this.submittedTaskCount.addAndGet(invokeAll.size());
            return invokeAll;
        } catch (RejectedExecutionException e) {
            this.rejectedExecutionHandler.rejectedExecution();
            throw e;
        }
    }

    @Override // java.util.concurrent.ExecutorService
    @Nonnull
    public <T> List<Future<T>> invokeAll(@Nonnull Collection<? extends Callable<T>> collection, long j, @Nonnull TimeUnit timeUnit) {
        try {
            long min = Math.min(this.timeoutInMilliseconds, timeUnit.toMillis(j));
            List<Future<T>> invokeAll = this.forkJoinPool.invokeAll(collection.stream().map(callable -> {
                return registerTask(callable, min);
            }).toList());
            this.submittedTaskCount.addAndGet(invokeAll.size());
            return invokeAll;
        } catch (RejectedExecutionException e) {
            this.rejectedExecutionHandler.rejectedExecution();
            throw e;
        }
    }

    @Override // java.util.concurrent.ExecutorService
    @Nonnull
    public <T> T invokeAny(@Nonnull Collection<? extends Callable<T>> collection) throws InterruptedException, ExecutionException {
        try {
            T t = (T) this.forkJoinPool.invokeAny(collection.stream().map(callable -> {
                return registerTask(callable, this.timeoutInMilliseconds);
            }).toList());
            this.submittedTaskCount.incrementAndGet();
            return t;
        } catch (RejectedExecutionException e) {
            this.rejectedExecutionHandler.rejectedExecution();
            throw e;
        }
    }

    @Override // java.util.concurrent.ExecutorService
    public <T> T invokeAny(@Nonnull Collection<? extends Callable<T>> collection, long j, @Nonnull TimeUnit timeUnit) throws InterruptedException, ExecutionException {
        try {
            long min = Math.min(this.timeoutInMilliseconds, timeUnit.toMillis(j));
            T t = (T) this.forkJoinPool.invokeAny(collection.stream().map(callable -> {
                return registerTask(callable, min);
            }).toList());
            this.submittedTaskCount.incrementAndGet();
            return t;
        } catch (RejectedExecutionException e) {
            this.rejectedExecutionHandler.rejectedExecution();
            throw e;
        }
    }

    @Override // java.util.concurrent.ExecutorService
    public void shutdown() {
        this.forkJoinPool.shutdown();
    }

    @Override // java.util.concurrent.ExecutorService
    @Nonnull
    public List<Runnable> shutdownNow() {
        return this.forkJoinPool.shutdownNow();
    }

    @Override // java.util.concurrent.ExecutorService
    public boolean isShutdown() {
        return this.forkJoinPool.isShutdown();
    }

    @Override // java.util.concurrent.ExecutorService
    public boolean isTerminated() {
        return this.forkJoinPool.isTerminated();
    }

    @Override // java.util.concurrent.ExecutorService
    public boolean awaitTermination(long j, @Nonnull TimeUnit timeUnit) throws InterruptedException {
        return this.forkJoinPool.awaitTermination(j, timeUnit);
    }

    @Nonnull
    private Runnable registerTask(@Nonnull Runnable runnable, long j) {
        return (Runnable) addTaskToQueue(new ObservableRunnable(runnable, j));
    }

    @Nonnull
    private <V> Callable<V> registerTask(@Nonnull Callable<V> callable, long j) {
        return (Callable) addTaskToQueue(new ObservableCallable(callable, j));
    }

    @Nonnull
    private <T extends ObservableTask> T addTaskToQueue(@Nonnull T t) {
        WeakReference<ObservableTask> weakReference = new WeakReference<>(t);
        try {
            this.queue.add(weakReference);
        } catch (IllegalStateException e) {
            cancelTimedOutTasks();
            try {
                this.queue.add(weakReference);
            } catch (IllegalStateException e2) {
                this.rejectedExecutionHandler.rejectedExecution();
                throw e2;
            }
        }
        return t;
    }

    private void cancelTimedOutTasks() {
        int i = 0;
        if (this.bufferLock.tryLock()) {
            try {
                int size = this.queue.size();
                int i2 = 0;
                while (i2 < size) {
                    long currentTimeMillis = System.currentTimeMillis() - this.timeoutInMilliseconds;
                    i2 += this.queue.drainTo(this.buffer, BUFFER_CAPACITY);
                    Iterator<WeakReference<ObservableTask>> it = this.buffer.iterator();
                    while (it.hasNext()) {
                        ObservableTask observableTask = it.next().get();
                        if (observableTask == null) {
                            it.remove();
                        } else if (observableTask.isFinished()) {
                            it.remove();
                        } else if (observableTask.isTimedOut(currentTimeMillis)) {
                            i++;
                            log.info("Cancelling timed out task: {}", observableTask);
                            observableTask.cancel();
                            it.remove();
                        }
                    }
                    this.queue.addAll(this.buffer);
                    this.buffer.clear();
                }
            } finally {
                this.bufferLock.unlock();
            }
        } else {
            while (this.bufferLock.isLocked()) {
                Thread.onSpinWait();
            }
        }
        new BackgroundTaskTimedOutEvent(this.name, i).commit();
    }

    @Override // io.evitadb.core.async.ObservableExecutorServiceWithHardDeadline
    @Nonnull
    public Runnable createTask(@Nonnull String str, @Nonnull Runnable runnable) {
        return new ObservableRunnable(str, runnable, this.timeoutInMilliseconds);
    }

    @Override // io.evitadb.core.async.ObservableExecutorServiceWithHardDeadline
    @Nonnull
    public Runnable createTask(@Nonnull Runnable runnable) {
        StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
        return new ObservableRunnable(stackTrace.length > 1 ? stackTrace[1].toString() : "Unknown", runnable, this.timeoutInMilliseconds);
    }

    @Override // io.evitadb.core.async.ObservableExecutorServiceWithHardDeadline
    @Nonnull
    public Runnable createTask(@Nonnull String str, @Nonnull Runnable runnable, long j) {
        return new ObservableRunnable(str, runnable, j);
    }

    @Override // io.evitadb.core.async.ObservableExecutorServiceWithHardDeadline
    @Nonnull
    public Runnable createTask(@Nonnull Runnable runnable, long j) {
        StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
        return new ObservableRunnable(stackTrace.length > 1 ? stackTrace[1].toString() : "Unknown", runnable, j);
    }

    @Override // io.evitadb.core.async.ObservableExecutorServiceWithHardDeadline
    @Nonnull
    public <V> Callable<V> createTask(@Nonnull String str, @Nonnull Callable<V> callable) {
        return new ObservableCallable(str, callable, this.timeoutInMilliseconds);
    }

    @Override // io.evitadb.core.async.ObservableExecutorServiceWithHardDeadline
    @Nonnull
    public <V> Callable<V> createTask(@Nonnull Callable<V> callable) {
        StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
        return new ObservableCallable(stackTrace.length > 1 ? stackTrace[1].toString() : "Unknown", callable, this.timeoutInMilliseconds);
    }

    @Override // io.evitadb.core.async.ObservableExecutorServiceWithHardDeadline
    @Nonnull
    public <V> Callable<V> createTask(@Nonnull String str, @Nonnull Callable<V> callable, long j) {
        return new ObservableCallable(str, callable, j);
    }

    @Override // io.evitadb.core.async.ObservableExecutorServiceWithHardDeadline
    @Nonnull
    public <V> Callable<V> createTask(@Nonnull Callable<V> callable, long j) {
        StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
        return new ObservableCallable(stackTrace.length > 1 ? stackTrace[1].toString() : "Unknown", callable, j);
    }

    @Override // java.util.concurrent.ExecutorService
    @Nonnull
    public /* bridge */ /* synthetic */ Future submit(@Nonnull Runnable runnable, Object obj) {
        return submit(runnable, (Runnable) obj);
    }
}
