package pl.edu.icm.unity.store.hz.rdbmsflush;

import com.hazelcast.core.HazelcastException;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.core.ILock;
import com.hazelcast.core.LifecycleEvent;
import com.hazelcast.core.TransactionalQueue;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ibatis.exceptions.PersistenceException;
import org.apache.logging.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import pl.edu.icm.unity.base.utils.Log;
import pl.edu.icm.unity.store.StorageConfiguration;
import pl.edu.icm.unity.store.StorageEngine;
import pl.edu.icm.unity.store.api.tx.TransactionalRunner;
import pl.edu.icm.unity.store.hz.tx.HzTransactionTL;
import pl.edu.icm.unity.store.rdbms.tx.SQLTransactionTL;

@Component
/* loaded from: input_file:pl/edu/icm/unity/store/hz/rdbmsflush/RDBMSEventSink.class */
public class RDBMSEventSink {
    private static final Logger log = Log.getLogger("unity.server.db", RDBMSEventSink.class);
    public static final String RDBMS_EVENTS_CONSUMER_LOCK = "rdbmsEventsConsumerLock";
    public static final String RDBMS_EVENTS_QUEUE = "rdbmsEventsQueue";
    public static final String INNER_WAIT_LOCK = "rdbmsEventsConsumerInnerLock";
    public static final String INNER_WAIT_CONDITION = "rdbmsEventsConsumerInnerCondition";
    private HazelcastInstance hzInstance;
    private TransactionalRunner hztx;
    private TransactionalRunner rdbmsTx;
    private RDBMSMutationEventProcessor rdbmsProcessor;
    private Thread flushThread;
    private volatile AtomicBoolean stopped = new AtomicBoolean(false);
    private volatile CountDownLatch latch = new CountDownLatch(0);
    private volatile AtomicBoolean working = new AtomicBoolean(false);
    private LinkedList<Exception> asyncProblems = new LinkedList<>();

    @Autowired
    public RDBMSEventSink(StorageConfiguration storageConfiguration, HazelcastInstance hazelcastInstance, @Qualifier("HzTransactionalRunner") TransactionalRunner transactionalRunner, @Qualifier("SQLTransactionalRunner") TransactionalRunner transactionalRunner2, RDBMSMutationEventProcessor rDBMSMutationEventProcessor) {
        this.hzInstance = hazelcastInstance;
        this.hztx = transactionalRunner;
        this.rdbmsTx = transactionalRunner2;
        this.rdbmsProcessor = rDBMSMutationEventProcessor;
        if (storageConfiguration.getEnumValue(StorageConfiguration.ENGINE, StorageEngine.class) == StorageEngine.hz) {
            hazelcastInstance.getLifecycleService().addLifecycleListener(this::onShutdown);
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                stop();
            }));
            this.flushThread = new Thread();
        }
    }

    public synchronized void start() {
        log.info("Starting flush thread");
        if (this.working.get()) {
            throw new IllegalStateException("Can not start events sink while it is already started");
        }
        this.stopped.set(false);
        this.working.set(true);
        this.latch = new CountDownLatch(1);
        this.flushThread = new Thread(() -> {
            awaitAndConsume();
        }, "Hazelcast to RDBMS flush");
        this.flushThread.start();
    }

    private void onShutdown(LifecycleEvent lifecycleEvent) {
        if (lifecycleEvent.getState() == LifecycleEvent.LifecycleState.SHUTTING_DOWN) {
            stop();
        }
    }

    public synchronized void stop() {
        log.info("Stopping flush thread");
        this.stopped.set(true);
        while (this.working.get()) {
            try {
                log.debug("Awaiting for termination");
                this.latch.await();
                break;
            } catch (InterruptedException e) {
            }
        }
        log.debug("Flush thread termiantion await finished");
        if (this.asyncProblems.isEmpty()) {
            return;
        }
        log.error("There where {} problems during flush", Integer.valueOf(this.asyncProblems.size()));
        Exception first = this.asyncProblems.getFirst();
        this.asyncProblems.clear();
        throw new PersistenceException("RDBMS flush failed", first);
    }

    private void awaitAndConsume() {
        ILock lock = this.hzInstance.getLock(RDBMS_EVENTS_CONSUMER_LOCK);
        lock.lock();
        log.info("This member was chosen as the RDBMS flush process");
        while (true) {
            try {
                try {
                    boolean processEvents = processEvents();
                    if (this.stopped.get() && !processEvents) {
                        break;
                    }
                } catch (Exception e) {
                    log.error("Exception when processing events", e);
                    this.asyncProblems.add(e);
                    log.debug("Flush thread is being stopped");
                    this.working.set(false);
                    this.latch.countDown();
                    try {
                        lock.unlock();
                    } catch (Exception e2) {
                        log.debug("Unlocking distributed flush lock failed, typically this is fine as we are exiting", e2);
                    }
                    log.info("This member is exiting and won't flush to RDBMS anymore");
                    return;
                }
            } catch (Throwable th) {
                log.debug("Flush thread is being stopped");
                this.working.set(false);
                this.latch.countDown();
                try {
                    lock.unlock();
                } catch (Exception e3) {
                    log.debug("Unlocking distributed flush lock failed, typically this is fine as we are exiting", e3);
                }
                log.info("This member is exiting and won't flush to RDBMS anymore");
                throw th;
            }
        }
        log.debug("Flush thread is being stopped");
        this.working.set(false);
        this.latch.countDown();
        try {
            lock.unlock();
        } catch (Exception e4) {
            log.debug("Unlocking distributed flush lock failed, typically this is fine as we are exiting", e4);
        }
        log.info("This member is exiting and won't flush to RDBMS anymore");
    }

    private boolean processEvents() {
        return ((Boolean) this.hztx.runInTransactionRet(() -> {
            RDBMSEventsBatch rDBMSEventsBatch;
            TransactionalQueue queue = HzTransactionTL.getHzContext().getQueue(RDBMS_EVENTS_QUEUE);
            do {
                try {
                    rDBMSEventsBatch = (RDBMSEventsBatch) queue.poll(250L, TimeUnit.MILLISECONDS);
                    if (rDBMSEventsBatch != null) {
                        break;
                    }
                } catch (HazelcastInstanceNotActiveException e) {
                    log.debug("Hazelcast instance was shut down, exiting");
                    return false;
                } catch (HazelcastException | InterruptedException e2) {
                    log.debug("Got Interrupt");
                    return Boolean.valueOf(queue.size() > 0);
                }
            } while (!this.stopped.get());
            if (rDBMSEventsBatch != null) {
                processSingleBatch(rDBMSEventsBatch);
            }
            return Boolean.valueOf(queue.size() > 0);
        })).booleanValue();
    }

    private void processSingleBatch(RDBMSEventsBatch rDBMSEventsBatch) {
        log.trace("Got RDBMS events batch for processing, size: {}", Integer.valueOf(rDBMSEventsBatch.getEvents().size()));
        this.rdbmsTx.runInTransaction(() -> {
            Iterator<RDBMSMutationEvent> it = rDBMSEventsBatch.getEvents().iterator();
            while (it.hasNext()) {
                this.rdbmsProcessor.apply(it.next(), SQLTransactionTL.getSql());
            }
        });
        log.trace("RDBMS events batch was flushed");
    }
}
