package io.fluxcapacitor.javaclient.tracking;

import io.fluxcapacitor.common.ErrorHandler;
import io.fluxcapacitor.common.Registration;
import io.fluxcapacitor.common.TimingUtils;
import io.fluxcapacitor.common.api.Message;
import io.fluxcapacitor.common.api.tracking.MessageBatch;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/fluxcapacitor/javaclient/tracking/Processor.class */
public class Processor implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(Processor.class);
    private final String name;
    private final int channel;
    private final int maxFetchBatchSize;
    private final int maxWaitDuration;
    private final ConsumerService consumerService;
    private final Consumer<List<Message>> consumer;
    private final int maxConsumerBatchSize;
    private final Duration retryDelay;
    private final ErrorHandler<List<Message>> consumerErrorHandler;
    private volatile boolean running;

    public Processor(String str, ConsumerService consumerService, Consumer<List<Message>> consumer) {
        this(str, 0, consumerService, consumer);
    }

    public Processor(String str, int i, ConsumerService consumerService, Consumer<List<Message>> consumer) {
        this(str, i, 1024, Duration.ofMillis(10000L), consumerService, consumer, 1024, Duration.ofSeconds(1L), (exc, list) -> {
            log.error("Consumer {} failed to handle batch {}", new Object[]{str, list, exc});
        });
    }

    public Processor(String str, int i, int i2, Duration duration, ConsumerService consumerService, Consumer<List<Message>> consumer, int i3, Duration duration2, ErrorHandler<List<Message>> errorHandler) {
        this.name = str;
        this.channel = i;
        this.maxFetchBatchSize = i2;
        this.maxWaitDuration = (int) duration.toMillis();
        this.consumerService = consumerService;
        this.consumer = consumer;
        this.maxConsumerBatchSize = i3;
        this.retryDelay = duration2;
        this.consumerErrorHandler = errorHandler;
    }

    public static Registration startSingle(String str, ConsumerService consumerService, Consumer<List<Message>> consumer) {
        Processor processor = new Processor(str, consumerService, consumer);
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        newSingleThreadExecutor.submit(processor);
        return () -> {
            processor.stop();
            newSingleThreadExecutor.shutdown();
            return true;
        };
    }

    public static Registration startMultiple(String str, int i, ConsumerService consumerService, Consumer<List<Message>> consumer) {
        List list = (List) IntStream.range(0, i).mapToObj(i2 -> {
            return new Processor(str, i2, consumerService, consumer);
        }).collect(Collectors.toList());
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(i);
        newFixedThreadPool.getClass();
        list.forEach((v1) -> {
            r1.submit(v1);
        });
        return () -> {
            list.forEach((v0) -> {
                v0.stop();
            });
            newFixedThreadPool.shutdown();
            return true;
        };
    }

    @Override // java.lang.Runnable
    public void run() {
        if (!this.running) {
            this.running = true;
        }
        while (this.running) {
            MessageBatch fetch = fetch();
            process(fetch.getMessages(), fetch.getSegment());
        }
    }

    public void stop() {
        this.running = false;
    }

    protected MessageBatch fetch() {
        return (MessageBatch) TimingUtils.retryOnFailure(() -> {
            return this.consumerService.read(this.name, this.channel, this.maxFetchBatchSize, this.maxWaitDuration, TimeUnit.MILLISECONDS);
        }, this.retryDelay, exc -> {
            return this.running;
        });
    }

    protected void process(List<Message> list, int[] iArr) {
        if (list.isEmpty() || !this.running) {
            return;
        }
        if (list.size() <= this.maxConsumerBatchSize) {
            processBatch(list, iArr);
            return;
        }
        int i = 0;
        while (true) {
            int i2 = i;
            if (i2 >= list.size()) {
                return;
            }
            processBatch(list.subList(i2, Math.min(i2 + this.maxConsumerBatchSize, list.size())), iArr);
            i = i2 + this.maxConsumerBatchSize;
        }
    }

    protected void processBatch(List<Message> list, int[] iArr) {
        try {
            this.consumer.accept(list);
        } catch (Exception e) {
            this.consumerErrorHandler.handleError(e, list);
        }
        TimingUtils.retryOnFailure(() -> {
            this.consumerService.storePosition(this.name, iArr, ((Message) list.get(list.size() - 1)).getIndex().longValue());
            return null;
        }, this.retryDelay, exc -> {
            return this.running;
        });
    }
}
