package io.datarouter.conveyor.queue.configuration;

import com.google.gson.Gson;
import io.datarouter.conveyor.Conveyor;
import io.datarouter.conveyor.ConveyorConfiguration;
import io.datarouter.conveyor.ConveyorCounters;
import io.datarouter.conveyor.ConveyorGauges;
import io.datarouter.conveyor.ConveyorRunnable;
import io.datarouter.instrumentation.trace.TracerTool;
import io.datarouter.model.databean.Databean;
import io.datarouter.model.key.primary.PrimaryKey;
import io.datarouter.storage.queue.GroupQueueMessage;
import io.datarouter.storage.queue.consumer.GroupQueueConsumer;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import javax.inject.Inject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/datarouter/conveyor/queue/configuration/BaseGroupQueueConsumerConveyorConfiguration.class */
public abstract class BaseGroupQueueConsumerConveyorConfiguration<PK extends PrimaryKey<PK>, D extends Databean<PK, D>> implements ConveyorConfiguration {
    private static final Logger logger = LoggerFactory.getLogger(BaseGroupQueueConsumerConveyorConfiguration.class);

    @Inject
    private ConveyorGauges gaugeRecorder;

    @Inject
    private Gson gson;

    protected abstract GroupQueueConsumer<PK, D> getQueueConsumer();

    protected abstract void processDatabeans(List<D> list);

    @Override // io.datarouter.conveyor.ConveyorConfiguration
    public Conveyor.ProcessResult process(ConveyorRunnable conveyorRunnable) {
        Instant now = Instant.now();
        GroupQueueMessage peek = getQueueConsumer().peek(DEFAULT_PEEK_TIMEOUT, DEFAULT_VISIBILITY_TIMEOUT);
        this.gaugeRecorder.savePeekDurationMs(conveyorRunnable, Duration.between(now, Instant.now()).toMillis());
        TracerTool.setAlternativeStartTime();
        if (peek == null) {
            logger.info("peeked conveyor={} nullMessage", conveyorRunnable.getName());
            return new Conveyor.ProcessResult(false);
        }
        List<D> databeans = peek.getDatabeans();
        logger.info("peeked conveyor={} messageCount={}", conveyorRunnable.getName(), Integer.valueOf(databeans.size()));
        Instant now2 = Instant.now();
        try {
            processDatabeans(databeans);
            if (!shouldAck()) {
                return new Conveyor.ProcessResult(true);
            }
            Instant now3 = Instant.now();
            this.gaugeRecorder.saveProcessBufferDurationMs(conveyorRunnable, Duration.between(now2, now3).toMillis());
            if (Duration.between(now2, now3).toMillis() > DEFAULT_VISIBILITY_TIMEOUT.toMillis()) {
                logger.warn("slow conveyor conveyor={} durationMs={} databeanCount={}", new Object[]{conveyorRunnable.getName(), Long.valueOf(Duration.between(now2, now3).toMillis()), Integer.valueOf(databeans.size())});
            }
            logger.info("consumed conveyor={} messageCount={}", conveyorRunnable.getName(), Integer.valueOf(databeans.size()));
            ConveyorCounters.incConsumedOpAndDatabeans(conveyorRunnable, databeans.size());
            Instant now4 = Instant.now();
            getQueueConsumer().ack(peek.getKey());
            this.gaugeRecorder.saveAckDurationMs(conveyorRunnable, Duration.between(now4, Instant.now()).toMillis());
            logger.info("acked conveyor={} messageCount={}", conveyorRunnable.getName(), Integer.valueOf(databeans.size()));
            ConveyorCounters.incAck(conveyorRunnable);
            return new Conveyor.ProcessResult(true);
        } catch (Exception e) {
            throw new RuntimeException("databeans=" + this.gson.toJson(databeans), e);
        }
    }

    protected boolean shouldAck() {
        return true;
    }
}
