package nstream.adapter.common.schedule;

import java.util.Objects;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import nstream.adapter.common.schedule.StageService;
import swim.api.agent.AbstractAgent;
import swim.concurrent.AbstractTask;
import swim.concurrent.TaskRef;
import swim.concurrent.TimerRef;

/* loaded from: input_file:nstream/adapter/common/schedule/ExecutorAgent.class */
public class ExecutorAgent extends AbstractAgent implements StageService {
    @Override // nstream.adapter.common.schedule.StageService
    public void execute(final Runnable runnable) {
        asyncStage().task(new AbstractTask() { // from class: nstream.adapter.common.schedule.ExecutorAgent.1
            public void runTask() {
                try {
                    runnable.run();
                } catch (Exception e) {
                    ExecutorAgent.this.didFail(e);
                }
            }

            public boolean taskWillBlock() {
                return true;
            }
        }).cue();
    }

    @Override // nstream.adapter.common.schedule.StageService
    public void executeNonblocking(Runnable runnable) {
        asyncStage().execute(() -> {
            try {
                runnable.run();
            } catch (Exception e) {
                didFail(e);
            }
        });
    }

    @Override // nstream.adapter.common.schedule.StageService
    public TimerRef schedule(Supplier<TimerRef> supplier, long j, StageService.StageRunnable stageRunnable) {
        cancelTimer(supplier);
        TaskRef ballisticTask = ballisticTask(supplier, stageRunnable);
        Objects.requireNonNull(ballisticTask);
        return setTimer(j, ballisticTask::cue);
    }

    @Override // nstream.adapter.common.schedule.StageService
    public TimerRef scheduleAtFixedRate(Supplier<TimerRef> supplier, long j, long j2, StageService.StageRunnable stageRunnable) {
        cancelTimer(supplier);
        TaskRef ballisticTask = ballisticTask(supplier, stageRunnable);
        return setTimer(j, () -> {
            ballisticTask.cue();
            ((TimerRef) supplier.get()).reschedule(j2);
        });
    }

    @Override // nstream.adapter.common.schedule.StageService
    public TimerRef scheduleWithFixedDelay(Supplier<TimerRef> supplier, long j, long j2, StageService.StageRunnable stageRunnable) {
        cancelTimer(supplier);
        TaskRef delayingTask = delayingTask(supplier, stageRunnable, j2);
        Objects.requireNonNull(delayingTask);
        return setTimer(j, delayingTask::cue);
    }

    @Override // nstream.adapter.common.schedule.StageService
    public <V> TimerRef scheduleWithInformedBackoff(Supplier<TimerRef> supplier, long j, BiFunction<V, Long, Long> biFunction, Function<V, Boolean> function, long j2, StageService.StageCallable<V> stageCallable, StageService.StageConsumer<V> stageConsumer) {
        cancelTimer(supplier);
        TaskRef backoffTask = backoffTask(supplier, biFunction, function, j2, stageCallable, stageConsumer);
        Objects.requireNonNull(backoffTask);
        return setTimer(j, backoffTask::cue);
    }

    @Override // nstream.adapter.common.schedule.StageService
    public TaskRef prepareLoop(final Supplier<TaskRef> supplier, final StageService.StageRunnable stageRunnable) {
        return asyncStage().task(new AbstractTask() { // from class: nstream.adapter.common.schedule.ExecutorAgent.2
            public void runTask() {
                try {
                    stageRunnable.run();
                    ((TaskRef) supplier.get()).cue();
                } catch (DeferrableException e) {
                    ExecutorAgent.this.handleDeferrableException(e);
                } catch (Exception e2) {
                    ExecutorAgent.this.didFailAndCancel(e2);
                }
            }

            public boolean taskWillBlock() {
                return true;
            }
        });
    }

    @Override // nstream.adapter.common.schedule.StageService
    public void handleDeferrableException(Exception exc) {
        didFail(exc);
    }

    private TaskRef ballisticTask(Supplier<TimerRef> supplier, StageService.StageRunnable stageRunnable) {
        return executorTask(supplier, stageRunnable, () -> {
        });
    }

    private TaskRef delayingTask(Supplier<TimerRef> supplier, StageService.StageRunnable stageRunnable, long j) {
        return executorTask(supplier, stageRunnable, () -> {
            ((TimerRef) supplier.get()).reschedule(j);
        });
    }

    private TaskRef executorTask(final Supplier<TimerRef> supplier, final StageService.StageRunnable stageRunnable, final Runnable runnable) {
        return asyncStage().task(new AbstractTask() { // from class: nstream.adapter.common.schedule.ExecutorAgent.3
            public void runTask() {
                try {
                    stageRunnable.run();
                } catch (RuntimeException e) {
                    ((TimerRef) supplier.get()).cancel();
                    ExecutorAgent.this.didFailAndCancel(e);
                    return;
                } catch (DeferrableException e2) {
                    ExecutorAgent.this.handleDeferrableException(e2);
                }
                runnable.run();
            }

            public boolean taskWillBlock() {
                return true;
            }
        });
    }

    private <V> TaskRef backoffTask(final Supplier<TimerRef> supplier, final BiFunction<V, Long, Long> biFunction, final Function<V, Boolean> function, final long j, final StageService.StageCallable<V> stageCallable, final StageService.StageConsumer<V> stageConsumer) {
        return asyncStage().task(new AbstractTask() { // from class: nstream.adapter.common.schedule.ExecutorAgent.4
            private volatile long previousBackoff = -1;

            public void runTask() {
                long longValue;
                do {
                    try {
                        Object call = stageCallable.call();
                        if (((Boolean) function.apply(call)).booleanValue()) {
                            this.previousBackoff = -1L;
                        }
                        stageConsumer.accept(call);
                        longValue = ((Long) biFunction.apply(call, Long.valueOf(this.previousBackoff))).longValue();
                        if (longValue < 0) {
                            ((TimerRef) supplier.get()).cancel();
                            return;
                        }
                    } catch (DeferrableException e) {
                        ExecutorAgent.this.handleDeferrableException(e);
                        ((TimerRef) supplier.get()).reschedule(j);
                        return;
                    } catch (Exception e2) {
                        ((TimerRef) supplier.get()).cancel();
                        ExecutorAgent.this.didFailAndCancel(e2);
                        return;
                    }
                } while (longValue <= 0);
                this.previousBackoff = longValue;
                ((TimerRef) supplier.get()).reschedule(longValue);
            }

            public boolean taskWillBlock() {
                return true;
            }
        });
    }

    private void didFailAndCancel(Throwable th) {
        didFail(new RuntimeException(nodeUri() + ": non-deferrable ExecutorAgent failure; canceled associated timer", th));
    }

    private void cancelTimer(Supplier<TimerRef> supplier) {
        TimerRef timerRef = supplier.get();
        if (timerRef != null) {
            timerRef.cancel();
        }
    }
}
