/*
 * Decompiled with CFR 0.152.
 */
package io.datarouter.conveyor.queue;

import io.datarouter.conveyor.BaseConveyor;
import io.datarouter.conveyor.ConveyorCounters;
import io.datarouter.conveyor.queue.BatchedQueueConsumer;
import io.datarouter.model.databean.Databean;
import io.datarouter.model.key.primary.PrimaryKey;
import io.datarouter.scanner.Scanner;
import io.datarouter.storage.queue.BaseQueueMessage;
import io.datarouter.storage.queue.QueueMessage;
import io.datarouter.storage.queue.QueueMessageKey;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseBatchedQueueConsumerConveyor<PK extends PrimaryKey<PK>, D extends Databean<PK, D>>
extends BaseConveyor {
    private static final Logger logger = LoggerFactory.getLogger(BaseBatchedQueueConsumerConveyor.class);
    private static final Duration PEEK_TIMEOUT = Duration.ofSeconds(10L);
    private static final int BATCH_SIZE = 100;
    private final BatchedQueueConsumer<PK, D> queueConsumer;
    private final Object lock = new Object();
    private List<QueueMessage<PK, D>> buffer;

    public BaseBatchedQueueConsumerConveyor(String name, Supplier<Boolean> shouldRun, BatchedQueueConsumer<PK, D> queueConsumer) {
        super(name, shouldRun, () -> false);
        this.queueConsumer = queueConsumer;
        this.buffer = new ArrayList<QueueMessage<PK, D>>(100);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public BaseConveyor.ProcessBatchResult processBatch() {
        List<QueueMessage<PK, D>> currentBuffer = Collections.emptyList();
        QueueMessage<PK, D> message = this.queueConsumer.peek(PEEK_TIMEOUT);
        if (message == null) {
            logger.info("peeked conveyor={} nullMessage", (Object)this.name);
            Object object = this.lock;
            synchronized (object) {
                currentBuffer = this.copyAndClearBuffer();
            }
            this.flushBuffer(currentBuffer);
            return new BaseConveyor.ProcessBatchResult(false);
        }
        Object object = this.lock;
        synchronized (object) {
            logger.info("peeked conveyor={} messageCount={}", (Object)this.name, (Object)1);
            this.buffer.add(message);
            if (this.buffer.size() >= 100) {
                currentBuffer = this.copyAndClearBuffer();
            }
        }
        this.flushBuffer(currentBuffer);
        return new BaseConveyor.ProcessBatchResult(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void interrupted() throws Exception {
        try {
            List<QueueMessage<PK, D>> currentBuffer;
            Object object = this.lock;
            synchronized (object) {
                currentBuffer = this.copyAndClearBuffer();
            }
            this.flushBuffer(currentBuffer);
        }
        catch (Exception ex) {
            throw new Exception("Exception processing buffer. bufferSize=" + this.buffer.size() + " bufferMessages=" + Arrays.toString(this.buffer.toArray()), ex);
        }
    }

    private void flushBuffer(List<QueueMessage<PK, D>> currentBuffer) {
        if (currentBuffer.isEmpty()) {
            return;
        }
        Scanner.of(currentBuffer).map(QueueMessage::getDatabean).flush(this::processBuffer);
        ConveyorCounters.incFlushBuffer(this, currentBuffer.size());
        Scanner.of(currentBuffer).map(BaseQueueMessage::getKey).flush(this::ackMessageLogAndIncCounter);
    }

    private List<QueueMessage<PK, D>> copyAndClearBuffer() {
        List<QueueMessage<PK, D>> currentBuffer = this.buffer.stream().collect(Collectors.toList());
        this.buffer.clear();
        return currentBuffer;
    }

    private void ackMessageLogAndIncCounter(List<QueueMessageKey> messageKeys) {
        logger.info("consumed conveyor={} messageCount={}", (Object)this.name, (Object)messageKeys.size());
        ConveyorCounters.incConsumedOpAndDatabeans(this, messageKeys.size());
        this.queueConsumer.ackMulti(messageKeys);
        logger.info("acked conveyor={} messageCount={}", (Object)this.name, (Object)messageKeys.size());
        ConveyorCounters.incAck(this, messageKeys.size());
    }

    protected abstract void processBuffer(List<D> var1);
}

