package com.addc.commons.queue;

import com.addc.commons.Constants;
import com.addc.commons.database.DatabaseException;
import com.addc.commons.database.derby.DerbyDatabase;
import com.addc.commons.i18n.I18nTextFactory;
import com.addc.commons.jmx.MBeanServerHelper;
import com.addc.commons.queue.persistence.DefaultElementSerializer;
import com.addc.commons.queue.persistence.PersistentQueue;
import com.addc.commons.queue.persistence.PersistentQueueException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/addc/commons/queue/PersistingQueue.class */
public class PersistingQueue<T> {
    private static final Long[] RETRY_DELAYS = {10L, 13L, 18L, 25L};
    private static final Logger LOGGER = LoggerFactory.getLogger(PersistingQueue.class);
    private final PersistentQueue<T> persistentQueue;
    private final LinkedBlockingQueue<T> payloadBuffer;
    private final Object bufferLock;
    private final Object takeLock;
    private final List<Long> throttleDelays;
    private final PersistingQueueStatistics<T> statistics;
    private final List<PersistingQueueStateListener> listeners;
    private final List<PersistingQueueTransitionListener<T>> transitionListeners;
    private Iterator<Long> throttleIterator;
    private boolean shutdown;
    private boolean interruptTake;
    private final boolean dumpStatisticsOnExit;

    public PersistingQueue(PersistenceConfig persistenceConfig, PersistingQueueStatistics<T> persistingQueueStatistics, int i) throws DatabaseException, PersistentQueueException {
        this(persistenceConfig, persistingQueueStatistics, false, i, null);
    }

    public PersistingQueue(PersistenceConfig persistenceConfig, PersistingQueueStatistics<T> persistingQueueStatistics, boolean z, int i, ElementSerializer<T> elementSerializer) throws DatabaseException, PersistentQueueException {
        this.bufferLock = new Object();
        this.takeLock = new Object();
        this.listeners = Collections.synchronizedList(new LinkedList());
        this.transitionListeners = Collections.synchronizedList(new LinkedList());
        this.dumpStatisticsOnExit = z;
        this.payloadBuffer = new LinkedBlockingQueue<>(i);
        this.statistics = persistingQueueStatistics;
        this.throttleDelays = Arrays.asList(RETRY_DELAYS);
        if (persistenceConfig.isPersistent()) {
            DerbyDatabase derbyDatabase = new DerbyDatabase(persistenceConfig.getDerbyDbDir(), persistenceConfig.isEncrypted());
            if (elementSerializer == null) {
                this.persistentQueue = new PersistentQueue<>(derbyDatabase, new DefaultElementSerializer());
            } else {
                this.persistentQueue = new PersistentQueue<>(derbyDatabase, elementSerializer);
            }
            LOGGER.info("Using persistent queue");
            fillBufferFromPersistentQueue();
        } else {
            this.persistentQueue = null;
        }
        MBeanServerHelper.getInstance().registerStandardMBean(persistingQueueStatistics, persistingQueueStatistics.getObjectName());
        LOGGER.info("Created Persisting Queue");
    }

    public void put(T t) {
        if (this.shutdown) {
            throw new IllegalStateException(I18nTextFactory.getTranslator("com.addc.commons.Messages").translate(Constants.QUEUE_SHUTDOWN));
        }
        synchronized (this.bufferLock) {
            LOGGER.debug("{}", t);
            this.statistics.itemCreated(t);
            if (this.payloadBuffer.offer(t)) {
                checkStateAndNotify();
            } else if (isPersistent()) {
                saveToPersistentQueue(t);
            } else {
                this.throttleIterator = this.throttleDelays.iterator();
                boolean z = false;
                while (this.throttleIterator.hasNext() && !z) {
                    throttle();
                    z = this.payloadBuffer.offer(t);
                }
                noPersistencePostProcessing(t, z);
            }
        }
        LOGGER.trace("Signal take()");
        synchronized (this.takeLock) {
            this.takeLock.notify();
        }
        LOGGER.trace("Signalled take()");
    }

    public T take() {
        if (this.shutdown) {
            throw new IllegalStateException(I18nTextFactory.getTranslator("com.addc.commons.Messages").translate(Constants.QUEUE_SHUTDOWN));
        }
        T t = null;
        synchronized (this.takeLock) {
            while (!this.interruptTake) {
                T poll = poll();
                t = poll;
                if (poll != null) {
                    break;
                }
                try {
                    this.takeLock.wait(500L);
                } catch (InterruptedException e) {
                    LOGGER.info("The take has been interrupted");
                }
            }
            this.interruptTake = false;
        }
        return t;
    }

    public void interruptTake() {
        synchronized (this.takeLock) {
            this.interruptTake = true;
            this.takeLock.notifyAll();
        }
    }

    public T poll() {
        if (this.shutdown) {
            throw new IllegalStateException(I18nTextFactory.getTranslator("com.addc.commons.Messages").translate(Constants.QUEUE_SHUTDOWN));
        }
        T t = null;
        synchronized (this.bufferLock) {
            if (this.payloadBuffer.isEmpty()) {
                fillBufferFromPersistentQueue();
            }
            if (!this.payloadBuffer.isEmpty()) {
                t = this.payloadBuffer.remove();
                checkStateAndNotify();
                fillBufferFromPersistentQueue();
            }
        }
        LOGGER.debug("{}", t);
        return t;
    }

    public void shutdown(T t) {
        if (this.shutdown) {
            throw new IllegalStateException(I18nTextFactory.getTranslator("com.addc.commons.Messages").translate(Constants.QUEUE_SHUTDOWN));
        }
        this.shutdown = true;
        if (isPersistent()) {
            if (t != null) {
                LOGGER.info("Saving first outstanding element to persistent queue...");
                saveToPersistentQueue(t);
            }
            Iterator<T> it = this.payloadBuffer.iterator();
            while (it.hasNext()) {
                T next = it.next();
                LOGGER.info("Pushing queue to persistence...");
                saveToPersistentQueue(next);
            }
            this.persistentQueue.shutdown();
        } else {
            if (t != null) {
                this.statistics.itemDropped(t);
            }
            Iterator<T> it2 = this.payloadBuffer.iterator();
            while (it2.hasNext()) {
                this.statistics.itemDropped(it2.next());
            }
            this.payloadBuffer.clear();
        }
        Iterator<PersistingQueueTransitionListener<T>> it3 = this.transitionListeners.iterator();
        while (it3.hasNext()) {
            it3.next().onShutdown();
        }
        if (this.dumpStatisticsOnExit) {
            LOGGER.info(this.statistics.getQueueStatistics());
        }
        LOGGER.info("Unregistering Statistics MBean ...");
        if (!MBeanServerHelper.getInstance().unregisterMBean(this.statistics.getObjectName())) {
            LOGGER.error("Failed to unregister MBean {}", this.statistics.getObjectName());
        }
        LOGGER.info("Terminated.");
    }

    public boolean isShutdown() {
        return this.shutdown;
    }

    private void noPersistencePostProcessing(T t, boolean z) {
        if (z) {
            Iterator<PersistingQueueTransitionListener<T>> it = this.transitionListeners.iterator();
            while (it.hasNext()) {
                it.next().onPut();
            }
        } else {
            LOGGER.debug("Dropping {}", t);
            this.statistics.itemDropped(t);
            Iterator<PersistingQueueTransitionListener<T>> it2 = this.transitionListeners.iterator();
            while (it2.hasNext()) {
                it2.next().onDrop(t);
            }
        }
    }

    private void fillBufferFromPersistentQueue() {
        boolean offer;
        if (isPersistent()) {
            LOGGER.debug("Fill the buffer from database");
            do {
                T poll = this.persistentQueue.poll();
                if (poll == null) {
                    offer = true;
                } else {
                    this.statistics.itemReadFromPersistence(poll);
                    LOGGER.debug("Read {} from persistence", poll);
                    offer = this.payloadBuffer.offer(poll);
                }
            } while (!offer);
        }
    }

    private void saveToPersistentQueue(T t) {
        try {
            this.persistentQueue.put(t);
            this.statistics.itemWrittenToPersistence(t);
            LOGGER.debug("Wrote {} to persistence", t);
        } catch (Exception e) {
            LOGGER.error(I18nTextFactory.getTranslator("com.addc.commons.Messages").translate(Constants.QUEUE_DROP_ELEMENT), e);
            this.statistics.itemDropped(t);
        }
    }

    private void throttle() {
        long longValue = this.throttleIterator.next().longValue();
        LOGGER.debug("Sleeping for {}", Long.valueOf(longValue));
        try {
            Thread.sleep(longValue);
        } catch (InterruptedException e) {
            LOGGER.debug("Error while throttling", e);
        }
    }

    public void addListener(PersistingQueueStateListener persistingQueueStateListener) {
        this.listeners.add(persistingQueueStateListener);
    }

    public void addTransitionListener(PersistingQueueTransitionListener<T> persistingQueueTransitionListener) {
        this.transitionListeners.add(persistingQueueTransitionListener);
    }

    private void checkStateAndNotify() {
        for (PersistingQueueStateListener persistingQueueStateListener : this.listeners) {
            if (this.payloadBuffer.remainingCapacity() < 2) {
                persistingQueueStateListener.onGettingFull();
            } else if (this.payloadBuffer.size() == 1) {
                persistingQueueStateListener.onGettingEmpty();
            }
        }
        Iterator<PersistingQueueTransitionListener<T>> it = this.transitionListeners.iterator();
        while (it.hasNext()) {
            it.next().onPut();
        }
    }

    boolean isPersistent() {
        return this.persistentQueue != null;
    }

    PersistentQueue<T> getPersistentQueue() {
        return this.persistentQueue;
    }

    void clear() {
        this.payloadBuffer.clear();
        if (isPersistent()) {
            try {
                this.persistentQueue.clear();
            } catch (PersistentQueueException e) {
                LOGGER.error("FATAL: Error clearing persistent queue.", e);
            }
        }
    }

    LinkedBlockingQueue<T> getPayloadBuffer() {
        return this.payloadBuffer;
    }
}
