/*
 * Decompiled with CFR 0.152.
 */
package org.catools.common.concurrent;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.catools.common.collections.CList;
import org.catools.common.concurrent.CParallelRunner;
import org.catools.common.concurrent.CThreadRunner;
import org.catools.common.concurrent.CTimeBoxRunner;
import org.catools.common.concurrent.exceptions.CThreadTimeoutException;

public class CParallelIO<T> {
    private final AtomicReference<Throwable> throwableReference = new AtomicReference();
    private final AtomicInteger activeInputThreads = new AtomicInteger(0);
    private final AtomicBoolean eof = new AtomicBoolean(false);
    private final Object lock = new Object();
    private final String name;
    private final int parallelInputCount;
    private final int parallelOutputCount;
    private final Long timeout;
    private final TimeUnit unit;
    private final CList<T> sharedQueue = new CList();
    private final CList<T> outputQueue = new CList();
    private CParallelRunner inputExecutor;
    private CParallelRunner outputExecutor;

    public CParallelIO(String name, int parallelInputCount, int parallelOutputCount) {
        this(name, parallelInputCount, parallelOutputCount, null, null);
    }

    public CParallelIO(String name, int parallelInputCount, int parallelOutputCount, Long timeout, TimeUnit unit) {
        this.name = "Parallel IO " + name;
        this.parallelInputCount = parallelInputCount;
        this.parallelOutputCount = parallelOutputCount;
        this.timeout = timeout;
        this.unit = unit;
    }

    public void setInputExecutor(Function<AtomicBoolean, T> inputFunction) {
        this.inputExecutor = new CParallelRunner<Object>(this.name + " Input", this.parallelInputCount, () -> {
            try {
                this.activeInputThreads.incrementAndGet();
                do {
                    Object input = inputFunction.apply(this.eof);
                    this.performActionOnSharedQueue(() -> this.sharedQueue.add(input));
                } while (!this.eof.get() && this.isLive());
            }
            catch (Throwable t) {
                this.throwableReference.set(t);
                throw t;
            }
            finally {
                this.activeInputThreads.decrementAndGet();
            }
            return true;
        });
    }

    public void setOutputExecutor(BiConsumer<AtomicBoolean, T> outputFunction) {
        this.outputExecutor = new CParallelRunner<Object>(this.name + " Output", this.parallelOutputCount, () -> {
            block3: do {
                this.performActionOnSharedQueue(() -> {
                    this.outputQueue.addAll(this.sharedQueue);
                    this.sharedQueue.clear();
                    return true;
                });
                CList<T> clone = new CList<T>();
                CList<T> cList = this.outputQueue;
                synchronized (cList) {
                    clone.addAll(this.outputQueue);
                    this.outputQueue.removeAll(clone);
                }
                for (Object t : clone) {
                    if (this.throwableReference.get() != null) continue block3;
                    outputFunction.accept(this.eof, t);
                }
            } while ((!this.eof.get() || this.activeInputThreads.get() > 0 || this.outputQueue.isNotEmpty() || this.sharedQueue.isNotEmpty()) && this.isLive());
            return true;
        });
    }

    public void run() throws Throwable {
        if (this.timeout == null || this.unit == null) {
            try {
                CThreadRunner.run(() -> {
                    try {
                        this.inputExecutor.invokeAll();
                    }
                    catch (Throwable t) {
                        this.throwableReference.set(t);
                    }
                    finally {
                        this.eof.set(true);
                    }
                });
                this.outputExecutor.invokeAll();
            }
            catch (CThreadTimeoutException e) {
                this.eof.set(true);
                this.inputExecutor.shutdownNow();
                this.outputExecutor.shutdownNow();
                throw e;
            }
            if (this.throwableReference.get() != null) {
                throw this.throwableReference.get();
            }
        } else {
            this.run(this.timeout, this.unit);
        }
    }

    public void run(long timeout, TimeUnit unit) throws Throwable {
        try {
            CTimeBoxRunner.get(() -> {
                CThreadRunner.run(() -> {
                    try {
                        this.inputExecutor.invokeAll(timeout, unit);
                    }
                    catch (Throwable t) {
                        this.throwableReference.set(t);
                    }
                    this.eof.set(true);
                });
                try {
                    this.outputExecutor.invokeAll(timeout, unit);
                }
                catch (Throwable t) {
                    this.throwableReference.set(t);
                }
                return true;
            }, timeout, unit, true);
        }
        catch (CThreadTimeoutException e) {
            this.eof.set(true);
            this.inputExecutor.shutdownNow();
            this.outputExecutor.shutdownNow();
            throw e;
        }
        if (this.throwableReference.get() != null) {
            throw this.throwableReference.get();
        }
    }

    public boolean isStarted() {
        return this.inputExecutor.isStarted() || this.outputExecutor.isStarted();
    }

    public boolean isLive() {
        return !this.isFinished() && !this.isShutdown() && !this.isTerminated() && this.throwableReference.get() == null;
    }

    public boolean isFinished() {
        return this.inputExecutor.isFinished() && this.outputExecutor.isFinished();
    }

    public boolean isShutdown() {
        return this.inputExecutor.isShutdown() && this.outputExecutor.isShutdown();
    }

    public boolean isTerminated() {
        return this.inputExecutor.isTerminated() && this.outputExecutor.isTerminated();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private synchronized <T> T performActionOnSharedQueue(Supplier<T> supplier) {
        Object object = this.lock;
        synchronized (object) {
            return supplier.get();
        }
    }
}

