package org.fiolino.common.processing.sink;

import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.fiolino.common.container.Container;

/* loaded from: input_file:org/fiolino/common/processing/sink/ParallelizingSink.class */
public final class ParallelizingSink<T> extends ChainedSink<T, T> {
    private static final Logger logger;
    private final Consumer<Runnable> executor;
    private final String name;
    private final int parallelity;
    private ParallelizingSink<T>.Task next;
    private volatile int commitCount;
    private long timeout;
    private volatile Container currentMetadata;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/fiolino/common/processing/sink/ParallelizingSink$SynchronizationPoint.class */
    public static class SynchronizationPoint {
        private final long timeout;
        private final Container metadata;
        private volatile CountDownLatch latch;
        private final CountDownLatch initializer = new CountDownLatch(1);
        private int waiters = 1;

        SynchronizationPoint(long j, Container container) {
            this.timeout = j;
            this.metadata = container;
        }

        Container getMetadata() {
            return this.metadata;
        }

        void register() {
            if (this.waiters == -1) {
                throw new IllegalStateException("Was already started.");
            }
            this.waiters++;
        }

        void startAndWait(String str) throws InterruptedException {
            this.latch = new CountDownLatch(this.waiters);
            ParallelizingSink.logger.info(() -> {
                return "Will wait for " + this.waiters + " threads";
            });
            this.waiters = -1;
            this.initializer.countDown();
            await(str);
        }

        void syncTask(String str) throws InterruptedException {
            ParallelizingSink.logger.info("Synchronizing " + str);
            this.initializer.await();
            await(str);
        }

        private void await(String str) throws InterruptedException {
            this.latch.countDown();
            if (this.latch.await(this.timeout, TimeUnit.SECONDS)) {
                return;
            }
            ParallelizingSink.logger.log(Level.WARNING, () -> {
                long j = this.timeout;
                return "Timeout after " + j + " seconds on " + j;
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/fiolino/common/processing/sink/ParallelizingSink$Task.class */
    public final class Task implements Runnable {
        private final int number;
        private final BlockingDeque<Object> queue;
        private final Sink<? super T> target;
        private int commitState = -1;
        private ParallelizingSink<T>.Task next;
        private int counter;
        private Throwable lastCause;

        Task(Sink<? super T> sink, int i, int i2) {
            this.number = i;
            this.queue = new LinkedBlockingDeque(i2);
            this.target = sink;
        }

        public String toString() {
            return ParallelizingSink.this.toString() + " #" + this.number + "/" + ParallelizingSink.this.parallelity;
        }

        void setNext(ParallelizingSink<T>.Task task) {
            this.next = task;
        }

        ParallelizingSink<T>.Task offer(T t) throws InterruptedException {
            return offerDirect(t, this);
        }

        private ParallelizingSink<T>.Task offerDirect(T t, ParallelizingSink<T>.Task task) throws InterruptedException {
            ensureRunning();
            return this.queue.offerLast(t) ? this.next : this.next.offerUntilBackAtStart(t, task);
        }

        private ParallelizingSink<T>.Task offerUntilBackAtStart(T t, ParallelizingSink<T>.Task task) throws InterruptedException {
            if (this != task) {
                return offerDirect(t, task);
            }
            insertUnconditional(t);
            return this.next;
        }

        private void ensureRunning() {
            if (isRunning()) {
                return;
            }
            this.commitState = ParallelizingSink.this.commitCount;
            ParallelizingSink.this.executor.accept(this);
        }

        void putIntoAll(SynchronizationPoint synchronizationPoint) throws InterruptedException {
            putIntoMe(synchronizationPoint, this);
        }

        private void putIntoAll(SynchronizationPoint synchronizationPoint, ParallelizingSink<T>.Task task) throws InterruptedException {
            if (this == task) {
                return;
            }
            putIntoMe(synchronizationPoint, task);
        }

        private void putIntoMe(SynchronizationPoint synchronizationPoint, ParallelizingSink<T>.Task task) throws InterruptedException {
            if (isRunning()) {
                synchronizationPoint.register();
                insertUnconditional(synchronizationPoint);
            }
            this.next.putIntoAll(synchronizationPoint, task);
        }

        private boolean isRunning() {
            return this.commitState == ParallelizingSink.this.commitCount;
        }

        private void insertUnconditional(Object obj) throws InterruptedException {
            this.queue.putLast(obj);
        }

        private void handle(Throwable th) {
            if (this.lastCause != null) {
                throwMultiException(th);
            }
            this.lastCause = th;
        }

        private void throwMultiException(Throwable th) {
            ParallelizingSink.logger.log(Level.SEVERE, "Multiple exceptions in parallel tasks!", th);
        }

        void throwError() throws Exception {
            Throwable collect = this.next.collect(this.lastCause, this);
            if (collect == null) {
                return;
            }
            if (collect instanceof Exception) {
                throw ((Exception) collect);
            }
            if (!(collect instanceof Error)) {
                throw new AssertionError(collect);
            }
            throw ((Error) collect);
        }

        private Throwable collect(Throwable th, ParallelizingSink<T>.Task task) {
            Throwable th2 = this.lastCause;
            this.lastCause = null;
            if (task == this) {
                return th;
            }
            if (th != null) {
                if (th2 != null) {
                    throwMultiException(th2);
                }
                th2 = th;
            }
            return this.next.collect(th2, task);
        }

        @Override // java.lang.Runnable
        public void run() {
            setThreadName("evaluating...");
            do {
                try {
                } catch (Throwable th) {
                    ParallelizingSink.logger.log(Level.SEVERE, "Failed " + Thread.currentThread().getName(), th);
                    run();
                }
            } while (evaluate(this.queue.pollFirst()));
            setThreadName("Finished.");
        }

        private void setThreadName(String str) {
            Thread.currentThread().setName(ParallelizingSink.this.nameFor(this) + " " + str + " Updates: " + this.counter + ", commits: " + this.commitState);
        }

        /* JADX WARN: Code restructure failed: missing block: B:0:?, code lost:
        
            r4 = r4;
         */
        /* JADX WARN: Multi-variable type inference failed */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        private boolean evaluate(java.lang.Object r4) {
            /*
                r3 = this;
            L0:
                r0 = r4
                if (r0 != 0) goto L1e
                r0 = r3
                org.fiolino.common.processing.sink.ParallelizingSink<T>$Task r0 = r0.next     // Catch: java.lang.InterruptedException -> L54
                r1 = r3
                boolean r0 = r0.stealWorkFor(r1)     // Catch: java.lang.InterruptedException -> L54
                if (r0 == 0) goto L11
                r0 = 1
                return r0
            L11:
                r0 = r3
                java.util.concurrent.BlockingDeque<java.lang.Object> r0 = r0.queue     // Catch: java.lang.InterruptedException -> L54
                java.lang.Object r0 = r0.takeFirst()     // Catch: java.lang.InterruptedException -> L54
                r4 = r0
                goto L0
            L1e:
                r0 = r4
                boolean r0 = r0 instanceof org.fiolino.common.processing.sink.ParallelizingSink.SynchronizationPoint     // Catch: java.lang.InterruptedException -> L54
                if (r0 == 0) goto L4b
            L25:
                r0 = r3
                org.fiolino.common.processing.sink.ParallelizingSink<T>$Task r0 = r0.next     // Catch: java.lang.InterruptedException -> L54
                r1 = r3
                boolean r0 = r0.stealWorkFor(r1)     // Catch: java.lang.InterruptedException -> L54
                if (r0 == 0) goto L33
                goto L25
            L33:
                r0 = r3
                r1 = r4
                org.fiolino.common.processing.sink.ParallelizingSink$SynchronizationPoint r1 = (org.fiolino.common.processing.sink.ParallelizingSink.SynchronizationPoint) r1     // Catch: java.lang.InterruptedException -> L54
                org.fiolino.common.container.Container r1 = r1.getMetadata()     // Catch: java.lang.InterruptedException -> L54
                r0.commit(r1)     // Catch: java.lang.InterruptedException -> L54
                r0 = r4
                org.fiolino.common.processing.sink.ParallelizingSink$SynchronizationPoint r0 = (org.fiolino.common.processing.sink.ParallelizingSink.SynchronizationPoint) r0     // Catch: java.lang.InterruptedException -> L54
                r1 = r3
                java.lang.String r1 = r1.toString()     // Catch: java.lang.InterruptedException -> L54
                r0.syncTask(r1)     // Catch: java.lang.InterruptedException -> L54
                r0 = 0
                return r0
            L4b:
                r0 = r4
                r5 = r0
                r0 = r3
                r1 = r5
                r0.consume(r1)     // Catch: java.lang.InterruptedException -> L54
                r0 = 1
                return r0
            L54:
                r5 = move-exception
                java.util.logging.Logger r0 = org.fiolino.common.processing.sink.ParallelizingSink.logger
                java.lang.Thread r1 = java.lang.Thread.currentThread()
                java.lang.String r1 = r1.getName()
                java.lang.String r1 = "Thread " + r1 + " is being stopped!"
                r0.info(r1)
                r0 = 0
                return r0
            */
            throw new UnsupportedOperationException("Method not decompiled: org.fiolino.common.processing.sink.ParallelizingSink.Task.evaluate(java.lang.Object):boolean");
        }

        private void commit(Container container) {
            if (this.target instanceof CloneableSink) {
                setThreadName("committing...");
                try {
                    ((CloneableSink) this.target).partialCommit(container);
                } catch (Exception e) {
                    handle(e);
                }
            }
        }

        private void consume(T t) {
            try {
                this.target.accept(t, ParallelizingSink.this.currentMetadata);
            } catch (AssertionError e) {
                handle(e);
            } catch (Error e2) {
                throw e2;
            } catch (Throwable th) {
                handle(th);
            }
            this.counter++;
        }

        int[] getCounters() {
            int[] iArr = new int[ParallelizingSink.this.parallelity];
            setCounter(iArr, 0);
            return iArr;
        }

        private void setCounter(int[] iArr, int i) {
            if (i >= ParallelizingSink.this.parallelity) {
                return;
            }
            iArr[i] = this.counter;
            this.next.setCounter(iArr, i + 1);
        }

        private boolean stealWorkFor(ParallelizingSink<T>.Task task) throws InterruptedException {
            if (this == task) {
                return false;
            }
            Object pollFirst = this.queue.pollFirst();
            if (pollFirst == null || !executeNextStolenWorkIn(pollFirst, task)) {
                return this.next.stealWorkFor(task);
            }
            return true;
        }

        private boolean executeNextStolenWorkIn(Object obj, ParallelizingSink<T>.Task task) throws InterruptedException {
            if (obj instanceof SynchronizationPoint) {
                this.queue.putFirst(obj);
                return false;
            }
            setThreadName("stealing work from " + this.number);
            task.consume(obj);
            return true;
        }
    }

    public static <T> Sink<T> createFor(Sink<T> sink, String str, Consumer<Runnable> consumer, int i, int i2) {
        validateTarget(sink);
        int realParallelity = getRealParallelity(i);
        return realParallelity == 0 ? sink : new ParallelizingSink(sink, str, consumer, realParallelity, i2);
    }

    private static <T> void validateTarget(Sink<? super T> sink) {
        if (!(sink instanceof CloneableSink) && !(sink instanceof ThreadsafeSink)) {
            throw new IllegalArgumentException("Target " + sink + " must be either cloneable or thread safe!");
        }
    }

    private static int getRealParallelity(int i) {
        return i < 0 ? Runtime.getRuntime().availableProcessors() - 1 : i;
    }

    private ParallelizingSink(Sink<? super T> sink, String str, Consumer<Runnable> consumer, int i, int i2) {
        super(sink);
        this.timeout = TimeUnit.MINUTES.toSeconds(5L);
        this.name = str;
        if (!$assertionsDisabled && i < 1) {
            throw new AssertionError();
        }
        if (i2 <= 0) {
            throw new IllegalArgumentException("QueueSize must be > 0: " + i2);
        }
        this.parallelity = i;
        this.executor = consumer;
        ParallelizingSink<T>.Task task = new Task(sink, 1, i2);
        this.next = task;
        for (int i3 = 1; i3 < i; i3++) {
            task = createTask(task, targetForCloning(sink), i2);
        }
        task.setNext(this.next);
    }

    public String toString() {
        return this.name;
    }

    public void setTimeout(long j) {
        this.timeout = j;
    }

    String nameFor(ParallelizingSink<T>.Task task) {
        return this.name + " #" + ((Task) task).number + "/" + this.parallelity;
    }

    private ParallelizingSink<T>.Task createTask(ParallelizingSink<T>.Task task, Sink<? super T> sink, int i) {
        ParallelizingSink<T>.Task task2 = new Task(sink, ((Task) task).number + 1, i);
        task.setNext(task2);
        return task2;
    }

    @Override // org.fiolino.common.processing.sink.Sink
    public void accept(T t, Container container) throws Exception {
        this.currentMetadata = container;
        try {
            this.next = this.next.offer(t);
        } catch (InterruptedException e) {
            logger.log(Level.WARNING, () -> {
                return "Adding " + t + " to full queue was interrupted!";
            });
            Thread.currentThread().interrupt();
        }
    }

    @Override // org.fiolino.common.processing.sink.ChainedSink, org.fiolino.common.processing.sink.Sink
    public void commit(Container container) throws Exception {
        try {
            SynchronizationPoint synchronizationPoint = new SynchronizationPoint(this.timeout, container);
            this.next.putIntoAll(synchronizationPoint);
            synchronizationPoint.startAndWait(this.name + " <MAIN THREAD>");
            this.commitCount++;
        } catch (InterruptedException e) {
            logger.log(Level.WARNING, () -> {
                return "Interrupted while finishing!";
            });
            Thread.currentThread().interrupt();
        }
        this.next.throwError();
        super.commit(container);
    }

    public int[] getWorkCounters() {
        return this.next.getCounters();
    }

    static {
        $assertionsDisabled = !ParallelizingSink.class.desiredAssertionStatus();
        logger = Logger.getLogger(ParallelizingSink.class.getName());
    }
}
