package io.datarouter.conveyor.queue.configuration;

import io.datarouter.conveyor.Conveyor;
import io.datarouter.conveyor.ConveyorConfiguration;
import io.datarouter.conveyor.ConveyorCounters;
import io.datarouter.conveyor.ConveyorGauges;
import io.datarouter.conveyor.ConveyorRunnable;
import io.datarouter.instrumentation.trace.TracerTool;
import io.datarouter.scanner.Scanner;
import io.datarouter.storage.queue.BlobQueueMessage;
import io.datarouter.storage.queue.consumer.BlobQueueConsumer;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import javax.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datarouter/conveyor/queue/configuration/BaseBlobQueueConsumerConveyorConfiguration.class */
public abstract class BaseBlobQueueConsumerConveyorConfiguration<T> implements ConveyorConfiguration {
    private static final Logger logger = LoggerFactory.getLogger(BaseBlobQueueConsumerConveyorConfiguration.class);

    @Inject
    private ConveyorGauges gaugeRecorder;

    protected abstract BlobQueueConsumer<T> getQueueConsumer();

    protected abstract void processOne(Scanner<T> scanner);

    @Override // io.datarouter.conveyor.ConveyorConfiguration
    public Conveyor.ProcessResult process(ConveyorRunnable conveyorRunnable) {
        Instant now = Instant.now();
        Optional peek = getQueueConsumer().peek(DEFAULT_PEEK_TIMEOUT, DEFAULT_VISIBILITY_TIMEOUT);
        this.gaugeRecorder.savePeekDurationMs(conveyorRunnable, Duration.between(now, Instant.now()).toMillis());
        TracerTool.setAlternativeStartTime();
        if (peek.isEmpty()) {
            logger.info("peeked conveyor={} nullMessage", conveyorRunnable.getName());
            return new Conveyor.ProcessResult(false);
        }
        BlobQueueMessage blobQueueMessage = (BlobQueueMessage) peek.get();
        logger.info("peeked conveyor={} messageCount={}", conveyorRunnable.getName(), 1);
        Instant now2 = Instant.now();
        try {
            processOne(blobQueueMessage.scanSplitDecodedData());
            if (!shouldAck()) {
                return new Conveyor.ProcessResult(true);
            }
            Instant now3 = Instant.now();
            this.gaugeRecorder.saveProcessBufferDurationMs(conveyorRunnable, Duration.between(now2, now3).toMillis());
            if (Duration.between(now2, now3).toMillis() > DEFAULT_VISIBILITY_TIMEOUT.toMillis()) {
                logger.warn("slow conveyor conveyor={} durationMs={}", conveyorRunnable.getName(), Long.valueOf(Duration.between(now2, now3).toMillis()));
            }
            logger.info("consumed conveyor={} messageCount={}", conveyorRunnable.getName(), 1);
            ConveyorCounters.incConsumedOpAndDatabeans(conveyorRunnable, 1L);
            Instant now4 = Instant.now();
            getQueueConsumer().ack(blobQueueMessage);
            this.gaugeRecorder.saveAckDurationMs(conveyorRunnable, Duration.between(now4, Instant.now()).toMillis());
            logger.info("acked conveyor={} messageCount={}", conveyorRunnable.getName(), 1);
            ConveyorCounters.incAck(conveyorRunnable);
            return new Conveyor.ProcessResult(true);
        } catch (Exception e) {
            throw new RuntimeException("failed to process message", e);
        }
    }

    protected boolean shouldAck() {
        return true;
    }
}
