/*
 * Decompiled with CFR 0.152.
 */
package io.activej.csp.process;

import io.activej.async.process.AsyncCloseable;
import io.activej.async.process.ReactiveProcess;
import io.activej.common.Checks;
import io.activej.common.recycle.Recyclers;
import io.activej.csp.binary.BinaryChannelSupplier;
import io.activej.csp.consumer.AbstractChannelConsumer;
import io.activej.csp.consumer.ChannelConsumer;
import io.activej.csp.process.ProcessCompleteException;
import io.activej.csp.supplier.AbstractChannelSupplier;
import io.activej.csp.supplier.ChannelSupplier;
import io.activej.promise.Promise;
import io.activej.promise.SettablePromise;
import io.activej.reactor.ImplicitlyReactive;
import io.activej.reactor.Reactive;
import org.jetbrains.annotations.Nullable;

public abstract class AbstractCommunicatingProcess
extends ImplicitlyReactive
implements ReactiveProcess {
    private static final boolean CHECKS = Checks.isEnabled(AbstractCommunicatingProcess.class);
    private boolean processStarted;
    private boolean processComplete;
    private final SettablePromise<Void> processCompletion = new SettablePromise();

    protected void beforeProcess() {
    }

    protected void afterProcess(@Nullable Exception e) {
    }

    public final boolean isProcessStarted() {
        return this.processStarted;
    }

    public final boolean isProcessComplete() {
        return this.processComplete;
    }

    protected final void completeProcess() {
        this.completeProcessEx(null);
    }

    protected final void completeProcessEx(@Nullable Exception e) {
        if (CHECKS) {
            Reactive.checkInReactorThread((Reactive)this);
        }
        if (this.isProcessComplete()) {
            return;
        }
        if (e == null) {
            this.processComplete = true;
            this.processCompletion.trySet(null);
            this.afterProcess(null);
        } else {
            this.closeEx(e);
        }
    }

    public final Promise<Void> getProcessCompletion() {
        return this.processCompletion;
    }

    public final Promise<Void> startProcess() {
        if (CHECKS) {
            Reactive.checkInReactorThread((Reactive)this);
        }
        if (!this.processStarted) {
            this.processStarted = true;
            this.beforeProcess();
            this.doProcess();
        }
        return this.processCompletion;
    }

    protected abstract void doProcess();

    public final void closeEx(Exception e) {
        if (CHECKS) {
            Reactive.checkInReactorThread((Reactive)this);
        }
        if (this.isProcessComplete()) {
            return;
        }
        this.processComplete = true;
        this.doClose(e);
        this.processCompletion.trySetException(e);
        this.afterProcess(e);
    }

    protected abstract void doClose(Exception var1);

    public final void close() {
        super.close();
    }

    protected final <T> ChannelSupplier<T> sanitize(final ChannelSupplier<T> supplier) {
        return new AbstractChannelSupplier<T>(){

            @Override
            protected Promise<T> doGet() {
                return this.sanitize(supplier.get());
            }

            protected void onClosed(Exception e) {
                supplier.closeEx(e);
                AbstractCommunicatingProcess.this.closeEx(e);
            }
        };
    }

    protected final <T> ChannelConsumer<T> sanitize(final ChannelConsumer<T> consumer) {
        return new AbstractChannelConsumer<T>(){

            @Override
            protected Promise<Void> doAccept(@Nullable T item) {
                return this.sanitize(consumer.accept(item));
            }

            protected void onClosed(Exception e) {
                consumer.closeEx(e);
                AbstractCommunicatingProcess.this.closeEx(e);
            }
        };
    }

    protected final BinaryChannelSupplier sanitize(final BinaryChannelSupplier supplier) {
        return new BinaryChannelSupplier(supplier.getBufs()){

            @Override
            public Promise<Void> needMoreData() {
                return this.sanitize(supplier.needMoreData());
            }

            @Override
            public Promise<Void> endOfStream() {
                return this.sanitize(supplier.endOfStream());
            }

            public void onClosed(Exception e) {
                supplier.closeEx(e);
                AbstractCommunicatingProcess.this.closeEx(e);
            }
        };
    }

    protected final <T> Promise<T> sanitize(Promise<T> promise) {
        return promise.async().then(this::doSanitize);
    }

    protected final <T> Promise<T> doSanitize(T value, @Nullable Exception e) {
        if (this.isProcessComplete()) {
            Recyclers.recycle(value);
            ProcessCompleteException processCompleteException = new ProcessCompleteException();
            if (value instanceof AsyncCloseable) {
                ((AsyncCloseable)value).closeEx((Exception)processCompleteException);
            }
            return Promise.ofException((Exception)processCompleteException);
        }
        if (e == null) {
            return Promise.of(value);
        }
        this.closeEx(e);
        return Promise.ofException((Exception)e);
    }
}

