package io.camunda.db.rdbms.write.queue;

import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.ibatis.session.ExecutorType;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.session.TransactionIsolationLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/camunda/db/rdbms/write/queue/ExecutionQueue.class */
public class ExecutionQueue {
    private static final Logger LOG = LoggerFactory.getLogger(ExecutionQueue.class);
    private final SqlSessionFactory sessionFactory;
    private final List<PreFlushListener> preFlushListeners = new ArrayList();
    private final List<PostFlushListener> postFlushListeners = new ArrayList();
    private final Queue<QueueItem> queue = new ConcurrentLinkedQueue();
    private final long partitionId;
    private final Integer queueFlushLimit;

    public ExecutionQueue(SqlSessionFactory sqlSessionFactory, long j, Integer num) {
        this.sessionFactory = sqlSessionFactory;
        this.partitionId = j;
        this.queueFlushLimit = num;
    }

    public void executeInQueue(QueueItem queueItem) {
        LOG.debug("[RDBMS ExecutionQueue, Partition {}] Added entry to queue: {}", Long.valueOf(this.partitionId), queueItem);
        this.queue.add(queueItem);
        checkQueueForFlush();
    }

    public void registerPreFlushListener(PreFlushListener preFlushListener) {
        this.preFlushListeners.add(preFlushListener);
    }

    public void registerPostFlushListener(PostFlushListener postFlushListener) {
        this.postFlushListeners.add(postFlushListener);
    }

    public void flush() {
        if (this.queue.isEmpty()) {
            LOG.trace("[RDBMS ExecutionQueue, Partition {}] Skip Flushing because execution queue is empty", Long.valueOf(this.partitionId));
            return;
        }
        LOG.debug("[RDBMS ExecutionQueue, Partition {}] Flushing execution queue with {} items", Long.valueOf(this.partitionId), Integer.valueOf(this.queue.size()));
        long currentTimeMillis = System.currentTimeMillis();
        SqlSession openSession = this.sessionFactory.openSession(ExecutorType.BATCH, TransactionIsolationLevel.READ_UNCOMMITTED);
        int i = 0;
        while (!this.queue.isEmpty()) {
            try {
                try {
                    QueueItem peek = this.queue.peek();
                    LOG.trace("[RDBMS ExecutionQueue, Partition {}] Executing entry: {}", Long.valueOf(this.partitionId), peek);
                    openSession.update(peek.statementId(), peek.parameter());
                    this.queue.remove();
                    i++;
                } catch (Exception e) {
                    LOG.error("[RDBMS ExecutionQueue, Partition {}] Error while executing queue", Long.valueOf(this.partitionId), e);
                    openSession.rollback();
                    openSession.close();
                    return;
                }
            } catch (Throwable th) {
                openSession.close();
                throw th;
            }
        }
        if (!this.preFlushListeners.isEmpty()) {
            LOG.debug("[RDBMS ExecutionQueue, Partition {}] Call pre flush listeners", Long.valueOf(this.partitionId));
            this.preFlushListeners.forEach((v0) -> {
                v0.onPreFlush();
            });
        }
        openSession.flushStatements();
        openSession.commit();
        if (!this.postFlushListeners.isEmpty()) {
            LOG.debug("[RDBMS ExecutionQueue, Partition {}] Call post flush listeners", Long.valueOf(this.partitionId));
            this.postFlushListeners.forEach((v0) -> {
                v0.onPostFlush();
            });
        }
        LOG.debug("[RDBMS ExecutionQueue, Partition {}] Commit queue with {} entries in {}ms", new Object[]{Long.valueOf(this.partitionId), Integer.valueOf(i), Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
        openSession.close();
    }

    private void checkQueueForFlush() {
        LOG.trace("[RDBMS ExecutionQueue, Partition {}] Checking if queue is flushed. Queue size: {}", Long.valueOf(this.partitionId), Integer.valueOf(this.queue.size()));
        if (this.queue.size() >= this.queueFlushLimit.intValue()) {
            flush();
        }
    }
}
