package com.day.util.mq;

import com.day.util.Condition;
import com.day.util.Counter;
import com.day.util.Queue;
import com.day.util.mq.HistoryImpl;
import java.io.File;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/day/util/mq/MessageQueueImpl.class */
final class MessageQueueImpl implements MessageQueue, Runnable {
    private static final Logger log = LoggerFactory.getLogger(MessageQueueImpl.class);
    private final String queueName;
    private Consumer consumer;
    private MessageFactory factory;
    private final Queue queue;
    private final HistoryImpl history;
    private final File directory;
    private final Condition flushing;
    private final Condition purging;
    private final Counter pending;
    private final Object deliveryDelay;

    public MessageQueueImpl(String str, MessageFactory messageFactory, File file) throws IOException {
        this.queue = new Queue();
        this.history = new HistoryImpl();
        this.flushing = new Condition();
        this.purging = new Condition();
        this.pending = new Counter();
        this.deliveryDelay = new Object();
        this.queueName = str;
        this.factory = messageFactory;
        this.directory = file;
        this.history.init(messageFactory, file, str + ".hist");
        HistoryImpl.EntryImpl[] entries = this.history.entries();
        for (int i = 0; i < entries.length; i++) {
            if (entries[i].getState() == 1) {
                this.queue.enqueue(entries[i]);
                this.pending.increment();
            }
        }
    }

    @Override // com.day.util.mq.MessageQueue
    public History history() {
        return this.history;
    }

    public MessageQueueImpl(String str) {
        this.queue = new Queue();
        this.history = new HistoryImpl();
        this.flushing = new Condition();
        this.purging = new Condition();
        this.pending = new Counter();
        this.deliveryDelay = new Object();
        this.queueName = str;
        this.directory = null;
    }

    @Override // com.day.util.mq.MessageQueue
    public void setConsumer(Consumer consumer) {
        if (this.consumer != null) {
            throw new IllegalStateException("consumer already set");
        }
        this.consumer = consumer;
        start();
    }

    @Override // com.day.util.mq.MessageQueue
    public void send(Message message) {
        this.queue.enqueue(this.history.append(message));
        this.pending.increment();
        deliverNow();
    }

    @Override // com.day.util.mq.MessageQueue
    public boolean contains(final Object obj) {
        return this.queue.contains(new Object() { // from class: com.day.util.mq.MessageQueueImpl.1
            public boolean equals(Object obj2) {
                if (obj2 instanceof HistoryImpl.EntryImpl) {
                    return obj.equals(((HistoryImpl.EntryImpl) obj2).getMessage());
                }
                return false;
            }
        });
    }

    void flush() {
        this.flushing.setTrue();
        deliverNow();
        try {
            this.pending.waitUntilZero();
        } catch (InterruptedException e) {
        }
        this.flushing.setFalse();
    }

    @Override // com.day.util.mq.MessageQueue
    public void purge() {
        this.purging.setTrue();
        flush();
        this.purging.setFalse();
    }

    @Override // com.day.util.mq.MessageQueue
    public void destroy() {
        purge();
        stop();
        try {
            this.history.delete();
        } catch (IOException e) {
        }
    }

    public void start() {
        Thread thread = new Thread(this, "MessageQueue[" + this.queueName + "]");
        thread.setDaemon(true);
        thread.start();
    }

    public void stop() {
        this.flushing.setTrue();
        deliverNow();
        this.queue.close();
    }

    @Override // java.lang.Runnable
    public void run() {
        while (true) {
            try {
                deliver((HistoryImpl.EntryImpl) this.queue.dequeue());
                this.pending.decrement();
            } catch (InterruptedException e) {
                try {
                    this.history.close();
                    return;
                } catch (IOException e2) {
                    return;
                }
            }
        }
    }

    @Override // com.day.util.mq.MessageQueue
    public void deliverNow() {
        synchronized (this.deliveryDelay) {
            this.deliveryDelay.notify();
        }
    }

    void deliver(HistoryImpl.EntryImpl entryImpl) {
        while (this.flushing.isFalse()) {
            try {
                this.consumer.consume(entryImpl.message);
                entryImpl.setState(2);
                entryImpl.message.purge();
                return;
            } catch (UndeliverableException e) {
                entryImpl.message.release();
                synchronized (this.deliveryDelay) {
                    try {
                        this.deliveryDelay.wait(e.timeout);
                    } catch (InterruptedException e2) {
                    }
                }
            } catch (Throwable th) {
                log.error("Uncaught exception in consumer.", th);
            }
        }
        if (this.purging.isTrue()) {
            entryImpl.message.purge();
            entryImpl.setState(3);
        }
    }
}
