package ru.infon.queuebox.mongo;

import gaillard.mongo.MongoQueueCore;
import java.util.Collection;
import java.util.Date;
import java.util.LinkedList;
import java.util.Properties;
import org.bson.Document;
import ru.infon.queuebox.MessageContainer;
import ru.infon.queuebox.QueueBehave;
import ru.infon.queuebox.QueueConsumer;
import ru.infon.queuebox.QueueSerializer;
import ru.infon.queuebox.RoutedMessage;

/* loaded from: input_file:ru/infon/queuebox/mongo/RoutedQueueBehave.class */
public class RoutedQueueBehave<T extends RoutedMessage> implements QueueBehave<T> {
    private static final String FIELD_SOURCE = "source";
    private static final String FIELD_DESCTINATION = "destination";
    private static final String FIELD_ID = "id";
    public static final String PROPERTY_FETCH_LIMIT = "queue.fetch.limit";
    private static final int DEFAULT_FETCH_LIMIT = 100;
    private final QueueSerializer<T> serializer;
    private final MongoConnection connection;
    private final MongoQueueCore mongoQueueCore;
    private int fetchLimit;

    public RoutedQueueBehave(Properties properties, Class<T> cls) {
        this.fetchLimit = DEFAULT_FETCH_LIMIT;
        this.serializer = new MongoJacksonSerializer(cls);
        this.connection = new MongoConnection(properties);
        this.mongoQueueCore = new MongoQueueCore(this.connection.getMongoCollection(Document.class));
        Document document = new Document();
        document.append(FIELD_DESCTINATION, 1);
        this.mongoQueueCore.ensureGetIndex(document);
        try {
            this.fetchLimit = Integer.parseInt(properties.getProperty(PROPERTY_FETCH_LIMIT));
        } catch (NumberFormatException e) {
        }
    }

    @Override // ru.infon.queuebox.QueueBehave
    public int getThreadsCount() {
        return this.connection.getThreadsCount();
    }

    @Override // ru.infon.queuebox.QueueBehave
    public void put(MessageContainer<T> messageContainer) {
        T message = messageContainer.getMessage();
        Document serialize = this.serializer.serialize(message);
        serialize.append(FIELD_SOURCE, message.getSource());
        serialize.append(FIELD_DESCTINATION, message.getDestination());
        this.mongoQueueCore.send(this.serializer.serialize(messageContainer.getMessage()), new Date(), messageContainer.getPriority());
    }

    @Override // ru.infon.queuebox.QueueBehave
    public Collection<MessageContainer<T>> find(QueueConsumer<T> queueConsumer) {
        Document document;
        Document document2 = new Document();
        document2.append(FIELD_DESCTINATION, queueConsumer.getConsumerId());
        LinkedList linkedList = new LinkedList();
        int i = this.fetchLimit;
        while (true) {
            int i2 = i;
            i--;
            if (i2 <= 0 || (document = this.mongoQueueCore.get(document2, 10, DEFAULT_FETCH_LIMIT, 0L)) == null) {
                break;
            }
            Object obj = document.get(FIELD_ID);
            document.remove(FIELD_ID);
            String string = document.getString(FIELD_DESCTINATION);
            String string2 = document.getString(FIELD_SOURCE);
            document.remove(FIELD_DESCTINATION);
            document.remove(FIELD_SOURCE);
            T deserialize = this.serializer.deserialize(document);
            deserialize.setSource(string2);
            deserialize.setDestination(string);
            MessageContainer messageContainer = new MessageContainer(deserialize);
            messageContainer.setId(obj);
            linkedList.add(messageContainer);
        }
        return linkedList;
    }

    @Override // ru.infon.queuebox.QueueBehave
    public void remove(MessageContainer<T> messageContainer) {
        Document document = new Document();
        document.append(FIELD_ID, messageContainer.getId());
        this.mongoQueueCore.ack(document);
    }

    @Override // ru.infon.queuebox.QueueBehave
    public void reset(MessageContainer<T> messageContainer) {
        T message = messageContainer.getMessage();
        Document serialize = this.serializer.serialize(message);
        serialize.append(FIELD_SOURCE, message.getSource());
        serialize.append(FIELD_DESCTINATION, message.getDestination());
        serialize.append(FIELD_ID, messageContainer.getId());
        this.mongoQueueCore.requeue(serialize);
    }
}
