/*
 * Decompiled with CFR 0.152.
 */
package io.activej.csp.process.transformer;

import io.activej.common.Checks;
import io.activej.csp.ChannelInput;
import io.activej.csp.ChannelOutput;
import io.activej.csp.consumer.ChannelConsumer;
import io.activej.csp.consumer.ChannelConsumers;
import io.activej.csp.dsl.WithChannelTransformer;
import io.activej.csp.process.AbstractCommunicatingProcess;
import io.activej.csp.supplier.ChannelSupplier;
import io.activej.promise.Promise;
import io.activej.reactor.Reactive;

public abstract class AbstractChannelTransformer<S extends AbstractChannelTransformer<S, I, O>, I, O>
extends AbstractCommunicatingProcess
implements WithChannelTransformer<S, I, O> {
    private static final boolean CHECKS = Checks.isEnabled(AbstractChannelTransformer.class);
    protected ChannelSupplier<I> input;
    protected ChannelConsumer<O> output;

    protected final Promise<Void> send(O item) {
        return this.output.accept(item);
    }

    protected final Promise<Void> sendEndOfStream() {
        return this.output.acceptEndOfStream();
    }

    protected abstract Promise<Void> onItem(I var1);

    protected Promise<Void> onProcessFinish() {
        return this.sendEndOfStream();
    }

    protected Promise<Void> onProcessStart() {
        return Promise.complete();
    }

    @Override
    protected void beforeProcess() {
        Checks.checkState((this.input != null ? 1 : 0) != 0, (Object)"Input was not set");
        Checks.checkState((this.output != null ? 1 : 0) != 0, (Object)"Output was not set");
    }

    @Override
    protected void doProcess() {
        Promise.complete().then(this::onProcessStart).then(() -> this.input.streamTo(ChannelConsumers.ofAsyncConsumer(this::onItem))).then(this::onProcessFinish).whenResult(() -> this.completeProcess()).whenException(this::closeEx);
    }

    @Override
    public ChannelInput<I> getInput() {
        return input -> {
            if (CHECKS) {
                Reactive.checkInReactorThread((Reactive)this);
            }
            this.input = this.sanitize(input);
            if (this.input != null && this.output != null) {
                this.startProcess();
            }
            return this.getProcessCompletion();
        };
    }

    @Override
    public ChannelOutput<O> getOutput() {
        return output -> {
            if (CHECKS) {
                Reactive.checkInReactorThread((Reactive)this);
            }
            this.output = this.sanitize(output);
            if (this.input != null && this.output != null) {
                this.startProcess();
            }
        };
    }

    @Override
    protected final void doClose(Exception e) {
        this.reactor.post(this::onCleanup);
        this.input.closeEx(e);
        this.output.closeEx(e);
    }

    protected void onCleanup() {
    }
}

