package org.kinotic.continuum.internal.core.api.aignite;

import io.vertx.core.AsyncResult;
import io.vertx.core.Closeable;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/kinotic/continuum/internal/core/api/aignite/IteratorEventLooper.class */
public class IteratorEventLooper<T> implements Handler<Void>, SuspendableObserver<T>, Closeable {
    private static final Logger log = LoggerFactory.getLogger(IteratorEventLooper.class);
    private final Vertx vertx;
    private final Context creatingContext;
    private final AtomicBoolean suspended = new AtomicBoolean(false);
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private Iterator<T> iterator;
    private Handler<T> dataHandler;
    private Handler<Void> completionHandler;
    private Handler<Throwable> exceptionHandler;

    public IteratorEventLooper(Vertx vertx, Iterator<T> it) {
        Validate.notNull(vertx, "Vertx is null", new Object[0]);
        Validate.notNull(it, "Iterator is null", new Object[0]);
        this.vertx = vertx;
        this.iterator = it;
        this.creatingContext = vertx.getOrCreateContext();
        if (this.creatingContext.isMultiThreadedWorkerContext()) {
            throw new IllegalStateException("Cannot use IteratorEventLooper in a multi-threaded worker verticle");
        }
        this.creatingContext.addCloseHook(this);
    }

    @Override // org.kinotic.continuum.internal.core.api.aignite.Observer
    public Observer<T> handler(Handler<T> handler) {
        this.dataHandler = handler;
        return this;
    }

    @Override // org.kinotic.continuum.internal.core.api.aignite.Observer
    public Observer<T> completionHandler(Handler<Void> handler) {
        this.completionHandler = handler;
        return this;
    }

    @Override // org.kinotic.continuum.internal.core.api.aignite.Observer
    public Observer<T> exceptionHandler(Handler<Throwable> handler) {
        this.exceptionHandler = handler;
        return this;
    }

    @Override // org.kinotic.continuum.internal.core.api.aignite.SuspendableObserver
    public SuspendableObserver<T> suspend() {
        this.suspended.compareAndSet(false, true);
        return this;
    }

    @Override // org.kinotic.continuum.internal.core.api.aignite.SuspendableObserver
    public SuspendableObserver<T> resume() {
        if (this.suspended.compareAndSet(true, false)) {
            doLoop();
        }
        return this;
    }

    public void handle(Void r5) {
        if (this.closed.get() || this.suspended.get()) {
            return;
        }
        if (this.iterator.hasNext()) {
            this.creatingContext.executeBlocking(promise -> {
                try {
                    promise.complete(this.iterator.next());
                } catch (Exception e) {
                    promise.fail(e);
                }
            }, asyncResult -> {
                if (asyncResult.succeeded()) {
                    Object result = asyncResult.result();
                    this.creatingContext.runOnContext(r6 -> {
                        try {
                            this.dataHandler.handle(result);
                            doLoop();
                        } catch (Exception e) {
                            log.warn("IteratorEventLooper's data handler threw an error " + e.getMessage());
                            log.warn("Terminating IteratorEventLooper!");
                            close();
                            if (this.exceptionHandler != null) {
                                this.creatingContext.runOnContext(r52 -> {
                                    this.exceptionHandler.handle(e);
                                });
                            }
                        }
                    });
                } else {
                    close();
                    if (this.exceptionHandler != null) {
                        this.creatingContext.runOnContext(r52 -> {
                            this.exceptionHandler.handle(asyncResult.cause());
                        });
                    }
                }
            });
            return;
        }
        close();
        if (this.completionHandler != null) {
            this.creatingContext.runOnContext(r4 -> {
                this.completionHandler.handle((Object) null);
            });
        }
    }

    private void doLoop() {
        this.creatingContext.runOnContext(this);
    }

    @Override // org.kinotic.continuum.internal.core.api.aignite.Observer
    public void start() {
        if (this.dataHandler == null) {
            throw new IllegalStateException("You must set the handler before calling start");
        }
        if (this.closed.get()) {
            throw new IllegalStateException("You cannot call start after calling close or after completed");
        }
        doLoop();
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        close(null);
    }

    public void close(Handler<AsyncResult<Void>> handler) {
        this.closed.set(true);
        this.iterator = null;
        if (handler != null) {
            this.vertx.getOrCreateContext().runOnContext(r4 -> {
                handler.handle(Future.succeededFuture());
            });
        }
        if (this.creatingContext != null) {
            this.creatingContext.removeCloseHook(this);
        }
    }
}
