package io.vlingo.common.completes;

import io.vlingo.common.Completes;
import io.vlingo.common.Scheduler;
import io.vlingo.common.completes.operations.AndThen;
import io.vlingo.common.completes.operations.AndThenConsume;
import io.vlingo.common.completes.operations.AndThenToSource;
import io.vlingo.common.completes.operations.FailureGateway;
import io.vlingo.common.completes.operations.Otherwise;
import io.vlingo.common.completes.operations.OtherwiseConsume;
import io.vlingo.common.completes.operations.Recover;
import io.vlingo.common.completes.operations.TimeoutGateway;
import io.vlingo.common.completes.sinks.InMemorySink;
import io.vlingo.common.completes.sources.InMemorySource;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;

/* loaded from: input_file:io/vlingo/common/completes/SinkAndSourceBasedCompletes.class */
public class SinkAndSourceBasedCompletes<T> implements Completes<T> {
    private static final long DEFAULT_TIMEOUT = Long.MAX_VALUE;
    private final Scheduler scheduler;
    public final Source<Object> source;
    private final Source<T> currentOperation;
    private final Sink<T> sink;

    protected SinkAndSourceBasedCompletes(Scheduler scheduler, Source<Object> source, Source<T> source2, Sink<T> sink) {
        this.scheduler = scheduler;
        this.source = source;
        this.sink = sink;
        this.currentOperation = source2;
    }

    protected SinkAndSourceBasedCompletes(Scheduler scheduler) {
        this.scheduler = scheduler;
        this.source = new InMemorySource();
        this.sink = new InMemorySink();
        this.currentOperation = (Source<T>) this.source;
        this.source.subscribe(this.sink);
    }

    public static <T> SinkAndSourceBasedCompletes<T> withScheduler(Scheduler scheduler) {
        InMemorySource inMemorySource = new InMemorySource();
        InMemorySink inMemorySink = new InMemorySink();
        inMemorySource.subscribe(inMemorySink);
        return new SinkAndSourceBasedCompletes<>(scheduler, inMemorySource, inMemorySource, inMemorySink);
    }

    public static boolean isToggleActive() {
        return Boolean.parseBoolean(System.getProperty("vlingo.InMemoryCompletes", "false"));
    }

    @Override // io.vlingo.common.Completes
    public <O> Completes<O> andThen(long j, O o, Function<T, O> function) {
        FailureGateway failureGateway = new FailureGateway(o);
        TimeoutGateway timeoutGateway = new TimeoutGateway(this.scheduler, j);
        AndThen andThen = new AndThen(function);
        this.currentOperation.subscribe(timeoutGateway);
        timeoutGateway.subscribe(andThen);
        andThen.subscribe(failureGateway);
        failureGateway.subscribe((InMemorySink) this.sink);
        return new SinkAndSourceBasedCompletes(this.scheduler, this.source, failureGateway, (InMemorySink) this.sink);
    }

    @Override // io.vlingo.common.Completes
    public <O> Completes<O> andThen(O o, Function<T, O> function) {
        return andThen(DEFAULT_TIMEOUT, o, function);
    }

    @Override // io.vlingo.common.Completes
    public <O> Completes<O> andThen(long j, Function<T, O> function) {
        return andThen(j, null, function);
    }

    @Override // io.vlingo.common.Completes
    public <O> Completes<O> andThen(Function<T, O> function) {
        return andThen(DEFAULT_TIMEOUT, null, function);
    }

    @Override // io.vlingo.common.Completes
    public Completes<T> andThenConsume(long j, T t, Consumer<T> consumer) {
        FailureGateway failureGateway = new FailureGateway(t);
        TimeoutGateway timeoutGateway = new TimeoutGateway(this.scheduler, j);
        AndThenConsume andThenConsume = new AndThenConsume(consumer);
        this.currentOperation.subscribe(timeoutGateway);
        timeoutGateway.subscribe(andThenConsume);
        andThenConsume.subscribe(failureGateway);
        failureGateway.subscribe(this.sink);
        return new SinkAndSourceBasedCompletes(this.scheduler, this.source, failureGateway, this.sink);
    }

    @Override // io.vlingo.common.Completes
    public Completes<T> andThenConsume(T t, Consumer<T> consumer) {
        return andThenConsume(DEFAULT_TIMEOUT, t, consumer);
    }

    @Override // io.vlingo.common.Completes
    public Completes<T> andThenConsume(long j, Consumer<T> consumer) {
        return andThenConsume(j, null, consumer);
    }

    @Override // io.vlingo.common.Completes
    public Completes<T> andThenConsume(Consumer<T> consumer) {
        return andThenConsume(DEFAULT_TIMEOUT, null, consumer);
    }

    @Override // io.vlingo.common.Completes
    public <F, O> O andThenTo(long j, F f, Function<T, O> function) {
        FailureGateway failureGateway = new FailureGateway(f);
        TimeoutGateway timeoutGateway = new TimeoutGateway(this.scheduler, j);
        AndThenToSource andThenToSource = new AndThenToSource(function.andThen(obj -> {
            return (Completes) obj;
        }).andThen(InMemorySource::fromCompletes));
        this.currentOperation.subscribe(timeoutGateway);
        timeoutGateway.subscribe(andThenToSource);
        andThenToSource.subscribe(failureGateway);
        failureGateway.subscribe((InMemorySink) this.sink);
        return (O) new SinkAndSourceBasedCompletes(this.scheduler, this.source, failureGateway, (InMemorySink) this.sink);
    }

    @Override // io.vlingo.common.Completes
    public <F, O> O andThenTo(F f, Function<T, O> function) {
        return (O) andThenTo(DEFAULT_TIMEOUT, f, function);
    }

    @Override // io.vlingo.common.Completes
    public <O> O andThenTo(long j, Function<T, O> function) {
        return (O) andThenTo(j, null, function);
    }

    @Override // io.vlingo.common.Completes
    public <O> O andThenTo(Function<T, O> function) {
        return (O) andThenTo(DEFAULT_TIMEOUT, null, function);
    }

    @Override // io.vlingo.common.Completes
    public Completes<T> otherwise(Function<T, T> function) {
        Otherwise otherwise = new Otherwise(function);
        this.currentOperation.subscribe(otherwise);
        otherwise.subscribe(this.sink);
        return new SinkAndSourceBasedCompletes(this.scheduler, this.source, otherwise, this.sink);
    }

    @Override // io.vlingo.common.Completes
    public Completes<T> otherwiseConsume(Consumer<T> consumer) {
        OtherwiseConsume otherwiseConsume = new OtherwiseConsume(consumer);
        this.currentOperation.subscribe(otherwiseConsume);
        otherwiseConsume.subscribe(this.sink);
        return new SinkAndSourceBasedCompletes(this.scheduler, this.source, otherwiseConsume, this.sink);
    }

    @Override // io.vlingo.common.Completes
    public Completes<T> recoverFrom(Function<Exception, T> function) {
        Recover recover = new Recover(function);
        this.currentOperation.subscribe(recover);
        recover.subscribe(this.sink);
        return new SinkAndSourceBasedCompletes(this.scheduler, this.source, recover, this.sink);
    }

    @Override // io.vlingo.common.Completes
    public <O> O await() {
        return (O) await(DEFAULT_TIMEOUT);
    }

    @Override // io.vlingo.common.Completes
    public <O> O await(long j) {
        this.source.activate();
        try {
            Optional<T> await = this.sink.await(j);
            if (await.isPresent()) {
                return await.get();
            }
            return null;
        } catch (Exception e) {
            return null;
        }
    }

    @Override // io.vlingo.common.Completes
    public boolean isCompleted() {
        return this.sink.hasBeenCompleted();
    }

    @Override // io.vlingo.common.Completes
    public boolean hasFailed() {
        return this.sink.hasFailed();
    }

    @Override // io.vlingo.common.Completes
    public void failed() {
        this.source.emitError(new IllegalStateException("Forced failure in Completes"));
    }

    @Override // io.vlingo.common.Completes
    public void failed(Exception exc) {
        this.source.emitError(exc);
    }

    @Override // io.vlingo.common.Completes
    public boolean hasOutcome() {
        return this.sink.hasOutcome();
    }

    @Override // io.vlingo.common.Completes
    public T outcome() {
        return (T) await();
    }

    @Override // io.vlingo.common.Completes
    public Completes<T> repeat() {
        this.sink.repeat();
        return this;
    }

    @Override // io.vlingo.common.Completes
    public Completes<T> timeoutWithin(long j) {
        return this;
    }

    @Override // io.vlingo.common.Completes
    public <F> Completes<T> useFailedOutcomeOf(F f) {
        return this;
    }

    @Override // io.vlingo.common.Completes
    public <O> Completes<O> andFinally() {
        return andFinally(obj -> {
            return obj;
        });
    }

    @Override // io.vlingo.common.Completes
    public <O> Completes<O> andFinally(Function<T, O> function) {
        Completes<O> andThen = andThen(function);
        this.source.activate();
        return andThen;
    }

    @Override // io.vlingo.common.Completes
    public void andFinallyConsume(Consumer<T> consumer) {
        andThenConsume(consumer);
        this.source.activate();
    }

    @Override // io.vlingo.common.Completes
    public <O> Completes<O> with(O o) {
        this.source.emitOutcome(o);
        return this;
    }

    public String toString() {
        return "SinkAndSourceBasedCompletes{scheduler=" + this.scheduler + ", source=" + this.source + ", currentOperation=" + this.currentOperation + ", sink=" + this.sink + '}';
    }
}
