package io.datarouter.conveyor.queue.configuration;

import io.datarouter.conveyor.Conveyor;
import io.datarouter.conveyor.ConveyorConfiguration;
import io.datarouter.conveyor.ConveyorCounters;
import io.datarouter.conveyor.ConveyorGauges;
import io.datarouter.conveyor.ConveyorRunnable;
import io.datarouter.instrumentation.trace.TracerTool;
import io.datarouter.util.buffer.MemoryBuffer;
import jakarta.inject.Inject;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.function.Consumer;

/* loaded from: input_file:io/datarouter/conveyor/queue/configuration/BaseMemoryBufferPutMultiConsumerConveyorConfiguration.class */
public abstract class BaseMemoryBufferPutMultiConsumerConveyorConfiguration<D> implements ConveyorConfiguration {
    private static final int BATCH_SIZE = 100;

    @Inject
    private ConveyorGauges gaugeRecorder;

    protected abstract MemoryBuffer<D> getMemoryBuffer();

    protected abstract Consumer<Collection<D>> getPutMultiConsumer();

    @Override // io.datarouter.conveyor.ConveyorConfiguration
    public Conveyor.ProcessResult process(ConveyorRunnable conveyorRunnable) {
        Instant now = Instant.now();
        List pollMultiWithLimit = getMemoryBuffer().pollMultiWithLimit(BATCH_SIZE);
        this.gaugeRecorder.savePeekDurationMs(conveyorRunnable, Duration.between(now, Instant.now()).toMillis());
        TracerTool.setAlternativeStartTime();
        if (pollMultiWithLimit.isEmpty()) {
            return new Conveyor.ProcessResult(false);
        }
        try {
            Instant now2 = Instant.now();
            getPutMultiConsumer().accept(pollMultiWithLimit);
            this.gaugeRecorder.saveProcessBufferDurationMs(conveyorRunnable, Duration.between(now2, Instant.now()).toMillis());
            ConveyorCounters.incPutMultiOpAndDatabeans(conveyorRunnable, pollMultiWithLimit.size());
            return new Conveyor.ProcessResult(true);
        } catch (RuntimeException e) {
            MemoryBuffer<D> memoryBuffer = getMemoryBuffer();
            memoryBuffer.getClass();
            pollMultiWithLimit.forEach(memoryBuffer::offer);
            ConveyorCounters.inc(conveyorRunnable, "putMulti exception", 1L);
            throw e;
        }
    }
}
