package io.fluxcapacitor.javaclient.tracking.client;

import io.fluxcapacitor.common.MessageType;
import io.fluxcapacitor.common.Registration;
import io.fluxcapacitor.common.TimingUtils;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.common.api.tracking.MessageBatch;
import io.fluxcapacitor.javaclient.FluxCapacitor;
import io.fluxcapacitor.javaclient.configuration.client.Client;
import io.fluxcapacitor.javaclient.tracking.BatchInterceptor;
import io.fluxcapacitor.javaclient.tracking.BatchProcessingException;
import io.fluxcapacitor.javaclient.tracking.ConsumerConfiguration;
import io.fluxcapacitor.javaclient.tracking.FluxCapacitorInterceptor;
import io.fluxcapacitor.javaclient.tracking.Tracker;
import io.fluxcapacitor.javaclient.tracking.TrackingException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/fluxcapacitor/javaclient/tracking/client/DefaultTracker.class */
public class DefaultTracker implements Runnable, Registration {
    private static final Logger log = LoggerFactory.getLogger(DefaultTracker.class);
    private static final ThreadGroup threadGroup = new ThreadGroup("DefaultTracker");
    private final Consumer<List<SerializedMessage>> consumer;
    private final Tracker tracker;
    private final Consumer<MessageBatch> processor;
    private final TrackingClient trackingClient;
    private final AtomicBoolean running = new AtomicBoolean();
    private final AtomicReference<Thread> thread = new AtomicReference<>();
    private final Duration retryDelay = Duration.ofSeconds(1);
    private final Long maxIndexExclusive;
    private final Long minIndex;
    private volatile Long lastProcessedIndex;
    private volatile boolean processing;

    public static Registration start(Consumer<List<SerializedMessage>> consumer, MessageType messageType, ConsumerConfiguration consumerConfiguration, FluxCapacitor fluxCapacitor) {
        return start(consumer, messageType, consumerConfiguration.toBuilder().clearBatchInterceptors().batchInterceptors((Collection) Stream.concat(Stream.of(new FluxCapacitorInterceptor(fluxCapacitor)), consumerConfiguration.getBatchInterceptors().stream()).collect(Collectors.toList())).build(), fluxCapacitor.client());
    }

    public static Registration start(Consumer<List<SerializedMessage>> consumer, MessageType messageType, ConsumerConfiguration consumerConfiguration, Client client) {
        List list = IntStream.range(0, consumerConfiguration.getThreads()).mapToObj(i -> {
            return new DefaultTracker(consumer, consumerConfiguration, new Tracker(consumerConfiguration.getName(), consumerConfiguration.getTrackerIdFactory().apply(client), messageType, consumerConfiguration, null), client.getTrackingClient(messageType));
        }).toList();
        for (int i2 = 0; i2 < list.size(); i2++) {
            ThreadGroup threadGroup2 = threadGroup;
            Runnable runnable = (Runnable) list.get(i2);
            Object[] objArr = new Object[3];
            objArr[0] = consumerConfiguration.getName();
            objArr[1] = consumerConfiguration.getName().contains(messageType.name()) ? "" : "-" + messageType;
            objArr[2] = Integer.valueOf(i2);
            new Thread(threadGroup2, runnable, String.format("%s%s-%d", objArr)).start();
        }
        client.beforeShutdown(() -> {
            list.forEach((v0) -> {
                v0.cancel();
            });
        });
        return () -> {
            list.forEach((v0) -> {
                v0.cancel();
            });
        };
    }

    public static Registration start(Consumer<List<SerializedMessage>> consumer, ConsumerConfiguration consumerConfiguration, TrackingClient trackingClient) {
        List list = IntStream.range(0, consumerConfiguration.getThreads()).mapToObj(i -> {
            return new DefaultTracker(consumer, consumerConfiguration, new Tracker(consumerConfiguration.getName(), UUID.randomUUID().toString(), trackingClient.getMessageType(), consumerConfiguration, null), trackingClient);
        }).toList();
        for (int i2 = 0; i2 < list.size(); i2++) {
            ThreadGroup threadGroup2 = threadGroup;
            Runnable runnable = (Runnable) list.get(i2);
            Object[] objArr = new Object[3];
            objArr[0] = consumerConfiguration.getName();
            objArr[1] = consumerConfiguration.getName().contains(trackingClient.getMessageType().name()) ? "" : "-" + trackingClient.getMessageType();
            objArr[2] = Integer.valueOf(i2);
            new Thread(threadGroup2, runnable, String.format("%s%s-%d", objArr)).start();
        }
        return () -> {
            list.forEach((v0) -> {
                v0.cancel();
            });
        };
    }

    private DefaultTracker(Consumer<List<SerializedMessage>> consumer, ConsumerConfiguration consumerConfiguration, Tracker tracker, TrackingClient trackingClient) {
        this.consumer = consumer;
        this.tracker = tracker;
        this.processor = BatchInterceptor.join(consumerConfiguration.getBatchInterceptors()).intercept(this::process, tracker);
        this.trackingClient = trackingClient;
        this.lastProcessedIndex = (Long) Optional.ofNullable(consumerConfiguration.getMinIndex()).map(l -> {
            return Long.valueOf(l.longValue() - 1);
        }).orElse(null);
        this.minIndex = consumerConfiguration.getMinIndex();
        this.maxIndexExclusive = consumerConfiguration.getMaxIndexExclusive();
    }

    @Override // java.lang.Runnable
    public void run() {
        if (this.running.compareAndSet(false, true)) {
            Tracker.current.set(this.tracker);
            this.thread.set(Thread.currentThread());
            while (this.running.get()) {
                MessageBatch fetch = fetch(this.lastProcessedIndex);
                if (fetch != null) {
                    Tracker.current.set(this.tracker.withMessageBatch(fetch));
                    this.processor.accept(fetch);
                }
            }
            Tracker.current.remove();
            this.thread.set(null);
        }
    }

    protected MessageBatch fetch(Long l) {
        return (MessageBatch) TimingUtils.retryOnFailure(() -> {
            return this.trackingClient.readAndWait(this.tracker.getName(), this.tracker.getTrackerId(), l, this.tracker.getConfiguration());
        }, this.retryDelay, exc -> {
            return this.running.get();
        });
    }

    protected void process(MessageBatch messageBatch) {
        boolean isMaxIndexReached;
        Long lastIndex = messageBatch.getLastIndex();
        try {
            this.processing = true;
            if (messageBatch.getMessages().isEmpty() || !this.running.get()) {
                this.processing = false;
                if (isMaxIndexReached(lastIndex)) {
                    cancel();
                    return;
                }
                return;
            }
            MessageBatch filterBatchIfNeeded = filterBatchIfNeeded(messageBatch);
            if (filterBatchIfNeeded.getMessages().isEmpty()) {
                updatePosition(filterBatchIfNeeded.getLastIndex(), filterBatchIfNeeded.getSegment());
                if (isMaxIndexReached) {
                    return;
                } else {
                    return;
                }
            }
            doProcess(this.consumer, filterBatchIfNeeded);
            this.processing = false;
            if (isMaxIndexReached(lastIndex)) {
                cancel();
            }
        } finally {
            this.processing = false;
            if (isMaxIndexReached(lastIndex)) {
                cancel();
            }
        }
    }

    private boolean isMaxIndexReached(Long l) {
        return (this.maxIndexExclusive == null || l == null || this.maxIndexExclusive.longValue() > l.longValue()) ? false : true;
    }

    private MessageBatch filterBatchIfNeeded(MessageBatch messageBatch) {
        if (shouldFilterBatch(messageBatch)) {
            messageBatch = new MessageBatch(messageBatch.getSegment(), (List) messageBatch.getMessages().stream().filter(messageIndexFilter()).collect(Collectors.toList()), Long.valueOf(isMaxIndexReached(messageBatch.getLastIndex()) ? this.maxIndexExclusive.longValue() - 1 : this.minIndex == null ? messageBatch.getLastIndex().longValue() : Math.max(this.minIndex.longValue(), messageBatch.getLastIndex().longValue())));
        }
        return messageBatch;
    }

    private boolean shouldFilterBatch(MessageBatch messageBatch) {
        return isMaxIndexReached(messageBatch.getLastIndex()) || !(this.minIndex == null || messageBatch.getMessages().isEmpty() || this.minIndex.longValue() <= ((SerializedMessage) messageBatch.getMessages().get(0)).getIndex().longValue());
    }

    private Predicate<SerializedMessage> messageIndexFilter() {
        Predicate<SerializedMessage> predicate = serializedMessage -> {
            return true;
        };
        if (this.maxIndexExclusive != null) {
            predicate = predicate.and(serializedMessage2 -> {
                return serializedMessage2.getIndex().longValue() < this.maxIndexExclusive.longValue();
            });
        }
        if (this.minIndex != null) {
            predicate = predicate.and(serializedMessage3 -> {
                return serializedMessage3.getIndex().longValue() >= this.minIndex.longValue();
            });
        }
        return predicate;
    }

    private void doProcess(Consumer<List<SerializedMessage>> consumer, MessageBatch messageBatch) {
        List<SerializedMessage> messages = messageBatch.getMessages();
        try {
            consumer.accept(messages);
            updatePosition(messageBatch.getLastIndex(), messageBatch.getSegment());
        } catch (BatchProcessingException e) {
            log.error("Consumer {} failed to handle batch of {} messages at index {} and did not handle exception. Consumer will be updated to the last processed index and then stopped.", new Object[]{this.tracker.getName(), Integer.valueOf(messages.size()), e.getMessageIndex()});
            updatePosition((Long) messages.stream().map((v0) -> {
                return v0.getIndex();
            }).filter(l -> {
                return (e.getMessageIndex() == null || l == null || l.longValue() >= e.getMessageIndex().longValue()) ? false : true;
            }).max(Comparator.naturalOrder()).orElse(null), messageBatch.getSegment());
            this.processing = false;
            cancel();
        } catch (Exception e2) {
            log.error("Consumer {} failed to handle batch of {} messages and did not handle exception. Tracker will be stopped.", new Object[]{this.tracker.getName(), Integer.valueOf(messages.size()), e2});
            this.processing = false;
            cancel();
        }
    }

    private void updatePosition(Long l, int[] iArr) {
        if (l != null) {
            this.lastProcessedIndex = l;
            TimingUtils.retryOnFailure(() -> {
                try {
                    this.trackingClient.storePosition(this.tracker.getName(), iArr, l.longValue()).get();
                } catch (Exception e) {
                    throw new TrackingException(String.format("Failed to store position of segments %s for tracker %s to index %s", Arrays.toString(iArr), this.tracker, l), e);
                }
            }, this.retryDelay, exc -> {
                return this.running.get();
            });
        }
    }

    public void cancel() {
        if (this.running.compareAndSet(true, false)) {
            if (!Thread.currentThread().equals(this.thread.get())) {
                try {
                } catch (Exception e) {
                    log.warn("Not allowed to cancel tracker {}", this.tracker.getName(), e);
                } finally {
                    this.thread.set(null);
                }
                if (!this.processing) {
                    this.thread.get().interrupt();
                }
                while (this.processing) {
                    try {
                        Thread.sleep(1L);
                    } catch (InterruptedException e2) {
                        Thread.currentThread().interrupt();
                        return;
                    }
                }
            }
            this.tracker.getConfiguration().getBatchInterceptors().forEach(batchInterceptor -> {
                try {
                    batchInterceptor.shutdown(this.tracker);
                } catch (Exception e3) {
                    log.warn("Failed to stop batch interceptor {}", batchInterceptor, e3);
                }
            });
        }
    }
}
