/*
 * Decompiled with CFR 0.152.
 */
package io.datarouter.conveyor.queue;

import io.datarouter.conveyor.BaseConveyor;
import io.datarouter.conveyor.ConveyorCounters;
import io.datarouter.conveyor.queue.QueueConsumer;
import io.datarouter.model.databean.Databean;
import io.datarouter.model.key.primary.PrimaryKey;
import io.datarouter.storage.queue.QueueMessage;
import java.time.Duration;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseQueueConsumerConveyor<PK extends PrimaryKey<PK>, D extends Databean<PK, D>>
extends BaseConveyor {
    private static final Logger logger = LoggerFactory.getLogger(BaseQueueConsumerConveyor.class);
    private static final Duration PEEK_TIMEOUT = Duration.ofSeconds(30L);
    private final QueueConsumer<PK, D> queueConsumer;

    public BaseQueueConsumerConveyor(String name, Supplier<Boolean> shouldRun, QueueConsumer<PK, D> queueConsumer) {
        super(name, shouldRun, () -> false);
        this.queueConsumer = queueConsumer;
    }

    @Override
    public BaseConveyor.ProcessBatchResult processBatch() {
        QueueMessage<PK, D> message = this.queueConsumer.peek(PEEK_TIMEOUT);
        if (message == null) {
            logger.info("peeked conveyor={} nullMessage", (Object)this.name);
            return new BaseConveyor.ProcessBatchResult(false);
        }
        Databean databean = message.getDatabean();
        logger.info("peeked conveyor={} messageCount={}", (Object)this.name, (Object)1);
        try {
            this.processOne(databean);
        }
        catch (Exception e) {
            throw new RuntimeException("databean=" + databean, e);
        }
        logger.info("consumed conveyor={} messageCount={}", (Object)this.name, (Object)1);
        ConveyorCounters.incConsumedOpAndDatabeans(this, 1L);
        this.queueConsumer.ack(message.getKey());
        logger.info("acked conveyor={} messageCount={}", (Object)this.name, (Object)1);
        ConveyorCounters.incAck(this);
        return new BaseConveyor.ProcessBatchResult(true);
    }

    protected abstract void processOne(D var1);
}

