package ai.konduit.serving.pipeline.impl.pipeline.loop;

import ai.konduit.serving.annotation.json.JsonName;
import ai.konduit.serving.pipeline.api.data.Data;
import ai.konduit.serving.pipeline.api.pipeline.Trigger;
import io.swagger.v3.oas.annotations.media.Schema;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import lombok.NonNull;
import org.nd4j.shade.jackson.annotation.JsonIgnoreProperties;
import org.nd4j.shade.jackson.annotation.JsonProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@JsonName("SIMPLE_LOOP_TRIGGER")
@Schema(description = "A simple looping trigger to be used with an AsyncPipeline. It has two modes of operation:<br>(a) Loop continuously with no delay - if frequencyMs is not set, or<br>(b) Loop every frequencyMs milliseconds, if this is set<br>Optionally, a fixed input Data instance may be provided that is fed into the pipeline at each call of the underlying pipeline (when executed in an async manner). If this is not provided, execution is performed using Data.empty() as input.")
@JsonIgnoreProperties({"stop", "thread", "exception", "first", "current", "callbackFn"})
/* loaded from: input_file:ai/konduit/serving/pipeline/impl/pipeline/loop/SimpleLoopTrigger.class */
public class SimpleLoopTrigger implements Trigger {
    private static final Logger log = LoggerFactory.getLogger(SimpleLoopTrigger.class);
    protected final Long frequencyMs;
    protected final Data data;
    protected AtomicBoolean stop;
    protected Thread thread;
    protected Throwable exception;
    protected CountDownLatch first;
    protected volatile Data current;
    protected Function<Data, Data> callbackFn;

    /* loaded from: input_file:ai/konduit/serving/pipeline/impl/pipeline/loop/SimpleLoopTrigger$InferenceRunner.class */
    private class InferenceRunner implements Runnable {
        private final AtomicBoolean stop;
        private final CountDownLatch first;

        protected InferenceRunner(AtomicBoolean atomicBoolean, CountDownLatch countDownLatch) {
            this.stop = atomicBoolean;
            this.first = countDownLatch;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                try {
                    runHelper();
                    if (SimpleLoopTrigger.this.current == null) {
                        this.first.countDown();
                    }
                } catch (Throwable th) {
                    SimpleLoopTrigger.log.error("Uncaught exception in SimpleLoopTrigger.InferenceRunner", th);
                    SimpleLoopTrigger.this.exception = th;
                    SimpleLoopTrigger.this.current = null;
                    if (SimpleLoopTrigger.this.current == null) {
                        this.first.countDown();
                    }
                }
            } catch (Throwable th2) {
                if (SimpleLoopTrigger.this.current == null) {
                    this.first.countDown();
                }
                throw th2;
            }
        }

        public void runHelper() {
            boolean z = SimpleLoopTrigger.this.frequencyMs != null;
            Data empty = SimpleLoopTrigger.this.data == null ? Data.empty() : SimpleLoopTrigger.this.data;
            boolean z2 = true;
            long firstRunDelay = SimpleLoopTrigger.this.firstRunDelay();
            while (!this.stop.get()) {
                if (z2 && firstRunDelay > 0) {
                    try {
                        Thread.sleep(firstRunDelay);
                    } catch (InterruptedException e) {
                        SimpleLoopTrigger.log.error("Received InterruptedException in " + getClass().getName() + " - stopping thread", e);
                        return;
                    }
                }
                long currentTimeMillis = z ? System.currentTimeMillis() : 0L;
                SimpleLoopTrigger.this.current = SimpleLoopTrigger.this.callbackFn.apply(empty);
                if (z2) {
                    this.first.countDown();
                    z2 = false;
                }
                if (z && !this.stop.get()) {
                    long nextStart = SimpleLoopTrigger.this.nextStart(currentTimeMillis);
                    long currentTimeMillis2 = System.currentTimeMillis();
                    if (nextStart > currentTimeMillis2) {
                        try {
                            Thread.sleep(nextStart - currentTimeMillis2);
                        } catch (InterruptedException e2) {
                            if (this.stop.get()) {
                                return;
                            }
                            SimpleLoopTrigger.log.error("Received InterruptedException in SimpleLoopTrigger - stopping thread", e2);
                            return;
                        }
                    } else {
                        continue;
                    }
                }
            }
        }
    }

    public SimpleLoopTrigger() {
        this((Long) null);
    }

    public SimpleLoopTrigger(Integer num) {
        this(num == null ? null : Long.valueOf(num.longValue()));
    }

    public SimpleLoopTrigger(Long l) {
        this(l, null);
    }

    public SimpleLoopTrigger(@JsonProperty("frequencyMs") Long l, @JsonProperty("data") Data data) {
        this.stop = new AtomicBoolean();
        this.first = new CountDownLatch(1);
        this.frequencyMs = l;
        this.data = data;
    }

    @Override // ai.konduit.serving.pipeline.api.pipeline.Trigger
    public Data query(Data data) {
        if (this.stop.get()) {
            throw new IllegalStateException("Unable to get output after trigger has been stopped");
        }
        if (this.current != null) {
            return this.current;
        }
        if (this.exception != null) {
            throw new RuntimeException("Error in Async execution thread", this.exception);
        }
        try {
            this.first.await();
        } catch (InterruptedException e) {
            log.error("Error while waiting for first async result", e);
        }
        if (this.current != null) {
            return this.current;
        }
        if (this.exception != null) {
            throw new RuntimeException("Error in Async execution thread", this.exception);
        }
        throw new RuntimeException("Unknown error occurred: current Data is null but no exception was thrown by async executioner");
    }

    @Override // ai.konduit.serving.pipeline.api.pipeline.Trigger
    public void setCallback(@NonNull Function<Data, Data> function) {
        if (function == null) {
            throw new NullPointerException("callbackFn is marked non-null but is null");
        }
        this.callbackFn = function;
        if (this.thread != null) {
            this.stop.set(true);
            this.thread.interrupt();
        }
        this.stop = new AtomicBoolean();
        this.current = null;
        this.first = new CountDownLatch(1);
        this.thread = new Thread(new InferenceRunner(this.stop, this.first));
        this.thread.setDaemon(true);
        this.thread.start();
    }

    @Override // ai.konduit.serving.pipeline.api.pipeline.Trigger
    public void stop() {
        this.stop.set(true);
        if (this.thread != null) {
            this.thread.interrupt();
        }
    }

    protected long firstRunDelay() {
        return 0L;
    }

    protected long nextStart(long j) {
        return j + this.frequencyMs.longValue();
    }

    public Long getFrequencyMs() {
        return this.frequencyMs;
    }

    public Data getData() {
        return this.data;
    }

    public AtomicBoolean getStop() {
        return this.stop;
    }

    public Thread getThread() {
        return this.thread;
    }

    public Throwable getException() {
        return this.exception;
    }

    public CountDownLatch getFirst() {
        return this.first;
    }

    public Data getCurrent() {
        return this.current;
    }

    public Function<Data, Data> getCallbackFn() {
        return this.callbackFn;
    }

    public void setStop(AtomicBoolean atomicBoolean) {
        this.stop = atomicBoolean;
    }

    public void setThread(Thread thread) {
        this.thread = thread;
    }

    public void setException(Throwable th) {
        this.exception = th;
    }

    public void setFirst(CountDownLatch countDownLatch) {
        this.first = countDownLatch;
    }

    public void setCurrent(Data data) {
        this.current = data;
    }

    public void setCallbackFn(Function<Data, Data> function) {
        this.callbackFn = function;
    }

    public boolean equals(Object obj) {
        if (obj == this) {
            return true;
        }
        if (!(obj instanceof SimpleLoopTrigger)) {
            return false;
        }
        SimpleLoopTrigger simpleLoopTrigger = (SimpleLoopTrigger) obj;
        if (!simpleLoopTrigger.canEqual(this)) {
            return false;
        }
        Long frequencyMs = getFrequencyMs();
        Long frequencyMs2 = simpleLoopTrigger.getFrequencyMs();
        if (frequencyMs == null) {
            if (frequencyMs2 != null) {
                return false;
            }
        } else if (!frequencyMs.equals(frequencyMs2)) {
            return false;
        }
        Data data = getData();
        Data data2 = simpleLoopTrigger.getData();
        return data == null ? data2 == null : data.equals(data2);
    }

    protected boolean canEqual(Object obj) {
        return obj instanceof SimpleLoopTrigger;
    }

    public int hashCode() {
        Long frequencyMs = getFrequencyMs();
        int hashCode = (1 * 59) + (frequencyMs == null ? 43 : frequencyMs.hashCode());
        Data data = getData();
        return (hashCode * 59) + (data == null ? 43 : data.hashCode());
    }

    public String toString() {
        return "SimpleLoopTrigger(frequencyMs=" + getFrequencyMs() + ")";
    }
}
