package org.kinotic.continuum.internal.core.api.aignite;

import io.vertx.core.AsyncResult;
import io.vertx.core.Closeable;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.cache.Cache;
import org.apache.commons.lang3.Validate;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.Query;
import org.apache.ignite.cache.query.SqlQuery;
import org.kinotic.continuum.core.api.event.StreamData;
import org.kinotic.continuum.internal.util.IgniteUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/kinotic/continuum/internal/core/api/aignite/IgniteContinuousQueryObserver.class */
public class IgniteContinuousQueryObserver<K, V> implements Observer<StreamData<K, V>>, Closeable {
    private static final Logger log = LoggerFactory.getLogger(IgniteContinuousQueryObserver.class);
    private AtomicBoolean closed;
    private Handler<StreamData<K, V>> observerDataHandler;
    private Handler<Throwable> observerExceptionHandler;
    private final Context observerContext;
    private final Vertx vertx;
    private IgniteCache<? extends K, ? extends V> igniteCache;
    private Query<Cache.Entry<K, V>> query;
    private IterableEventLooper<Cache.Entry<K, V>> looper;

    public IgniteContinuousQueryObserver(Vertx vertx, IgniteCache<? extends K, ? extends V> igniteCache) {
        this.closed = new AtomicBoolean(false);
        this.observerDataHandler = null;
        this.observerExceptionHandler = null;
        Validate.notNull(vertx);
        Validate.notNull(igniteCache);
        this.vertx = vertx;
        this.observerContext = vertx.getOrCreateContext();
        if (this.observerContext.isMultiThreadedWorkerContext()) {
            throw new IllegalStateException("Cannot use IgniteContinuousQueryObserver in a multi-threaded worker verticle");
        }
        this.observerContext.addCloseHook(this);
        this.igniteCache = igniteCache;
    }

    public IgniteContinuousQueryObserver(Vertx vertx, IgniteCache<? extends K, ? extends V> igniteCache, Query<Cache.Entry<K, V>> query) {
        this(vertx, igniteCache);
        Validate.notNull(query);
        this.query = query;
    }

    @Override // org.kinotic.continuum.internal.core.api.aignite.Observer
    public Observer<StreamData<K, V>> handler(Handler<StreamData<K, V>> handler) {
        this.observerDataHandler = handler;
        return this;
    }

    @Override // org.kinotic.continuum.internal.core.api.aignite.Observer
    public Observer<StreamData<K, V>> exceptionHandler(Handler<Throwable> handler) {
        this.observerExceptionHandler = handler;
        return this;
    }

    @Override // org.kinotic.continuum.internal.core.api.aignite.Observer
    public Observer<StreamData<K, V>> completionHandler(Handler<Void> handler) {
        return this;
    }

    @Override // org.kinotic.continuum.internal.core.api.aignite.Observer
    public void start() {
        if (this.observerDataHandler == null) {
            throw new IllegalStateException("You must set the handler before calling start");
        }
        if (this.closed.get()) {
            throw new IllegalStateException("You cannot call start after calling close or after completed");
        }
        ContinuousQuery continuousQuery = new ContinuousQuery();
        if (this.query != null) {
            continuousQuery.setInitialQuery(this.query);
        }
        if (this.query instanceof SqlQuery) {
            SqlQuery sqlQuery = this.query;
            continuousQuery.setRemoteFilterFactory(new ScriptCacheEntryFilterFactory(sqlQuery.getSql(), sqlQuery.getArgs()));
        }
        continuousQuery.setLocalListener(iterable -> {
            this.observerContext.runOnContext(r8 -> {
                IterableEventLooper iterableEventLooper = new IterableEventLooper(this.vertx, iterable);
                iterableEventLooper.handler(new ConverterHandler(IgniteUtils::cacheEntryEventToStreamData, this.observerDataHandler));
                iterableEventLooper.exceptionHandler(this::invokeObserverExceptionHandler);
                iterableEventLooper.start();
            });
        });
        this.observerContext.executeBlocking(promise -> {
            try {
                this.looper = new IterableEventLooper<>(this.vertx, this.igniteCache.query(continuousQuery), false);
                this.looper.handler(new ConverterHandler(IgniteUtils::cacheEntryToStreamData, this.observerDataHandler));
                this.looper.exceptionHandler(this::invokeObserverExceptionHandler);
                this.looper.start();
            } catch (Exception e) {
                invokeObserverExceptionHandler(e);
            }
        }, (Handler) null);
    }

    private void invokeObserverExceptionHandler(Throwable th) {
        try {
            close();
        } catch (Exception e) {
            log.warn("Exception closing IgniteContinuousQueryObserver", e);
        }
        if (this.observerExceptionHandler != null) {
            this.observerContext.runOnContext(r5 -> {
                try {
                    this.observerExceptionHandler.handle(th);
                } catch (Exception e2) {
                    log.warn("Query Observer's error handler threw an error " + th.getMessage());
                }
            });
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        close(null);
    }

    public void close(Handler<AsyncResult<Void>> handler) {
        if (log.isTraceEnabled()) {
            log.trace("Closing Continuous query");
        }
        this.closed.set(true);
        if (this.looper != null) {
            this.looper.close();
        }
        if (this.observerContext != null) {
            this.observerContext.removeCloseHook(this);
        }
        if (handler != null) {
            this.vertx.getOrCreateContext().runOnContext(r4 -> {
                handler.handle(Future.succeededFuture());
            });
        }
    }
}
