/*
 * Decompiled with CFR 0.152.
 */
package io.fluxcapacitor.javaclient.tracking.client;

import io.fluxcapacitor.common.MessageType;
import io.fluxcapacitor.common.Registration;
import io.fluxcapacitor.common.TimingUtils;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.common.api.tracking.MessageBatch;
import io.fluxcapacitor.javaclient.FluxCapacitor;
import io.fluxcapacitor.javaclient.configuration.client.Client;
import io.fluxcapacitor.javaclient.tracking.BatchInterceptor;
import io.fluxcapacitor.javaclient.tracking.BatchProcessingException;
import io.fluxcapacitor.javaclient.tracking.ConsumerConfiguration;
import io.fluxcapacitor.javaclient.tracking.FluxCapacitorInterceptor;
import io.fluxcapacitor.javaclient.tracking.Tracker;
import io.fluxcapacitor.javaclient.tracking.TrackingException;
import io.fluxcapacitor.javaclient.tracking.client.TrackingClient;
import java.time.Duration;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultTracker
implements Runnable,
Registration {
    private static final Logger log = LoggerFactory.getLogger(DefaultTracker.class);
    private static final ThreadGroup threadGroup = new ThreadGroup("DefaultTracker");
    private final Consumer<List<SerializedMessage>> consumer;
    private final Tracker tracker;
    private final Consumer<MessageBatch> processor;
    private final TrackingClient trackingClient;
    private final AtomicBoolean running = new AtomicBoolean();
    private final AtomicReference<Thread> thread = new AtomicReference();
    private final Duration retryDelay;
    private final Long maxIndexExclusive;
    private final Long minIndex;
    private volatile Long lastProcessedIndex;
    private volatile boolean processing;

    public static Registration start(Consumer<List<SerializedMessage>> consumer, MessageType messageType, ConsumerConfiguration config, FluxCapacitor fluxCapacitor) {
        return DefaultTracker.start(consumer, messageType, config.toBuilder().clearBatchInterceptors().batchInterceptors(Stream.concat(Stream.of(new FluxCapacitorInterceptor(fluxCapacitor)), config.getBatchInterceptors().stream()).collect(Collectors.toList())).build(), fluxCapacitor.client());
    }

    public static Registration start(Consumer<List<SerializedMessage>> consumer, MessageType messageType, ConsumerConfiguration config, Client client) {
        List<DefaultTracker> trackers = IntStream.range(0, config.getThreads()).mapToObj(i -> new DefaultTracker(consumer, config, new Tracker(config.getName(), config.getTrackerIdFactory().apply(client), messageType, config, null), client.getTrackingClient(messageType))).toList();
        for (int i2 = 0; i2 < trackers.size(); ++i2) {
            new Thread(threadGroup, trackers.get(i2), String.format("%s%s-%d", config.getName(), config.getName().contains(messageType.name()) ? "" : "-" + String.valueOf(messageType), i2)).start();
        }
        client.beforeShutdown(() -> trackers.forEach(DefaultTracker::cancel));
        return () -> trackers.forEach(DefaultTracker::cancel);
    }

    public static Registration start(Consumer<List<SerializedMessage>> consumer, ConsumerConfiguration config, TrackingClient trackingClient) {
        List<DefaultTracker> trackers = IntStream.range(0, config.getThreads()).mapToObj(i -> new DefaultTracker(consumer, config, new Tracker(config.getName(), UUID.randomUUID().toString(), trackingClient.getMessageType(), config, null), trackingClient)).toList();
        for (int i2 = 0; i2 < trackers.size(); ++i2) {
            new Thread(threadGroup, trackers.get(i2), String.format("%s%s-%d", config.getName(), config.getName().contains(trackingClient.getMessageType().name()) ? "" : "-" + String.valueOf(trackingClient.getMessageType()), i2)).start();
        }
        return () -> trackers.forEach(DefaultTracker::cancel);
    }

    private DefaultTracker(Consumer<List<SerializedMessage>> consumer, ConsumerConfiguration config, Tracker tracker, TrackingClient trackingClient) {
        this.consumer = consumer;
        this.tracker = tracker;
        this.processor = BatchInterceptor.join(config.getBatchInterceptors()).intercept(this::process, tracker);
        this.trackingClient = trackingClient;
        this.retryDelay = Duration.ofSeconds(1L);
        this.lastProcessedIndex = Optional.ofNullable(config.getMinIndex()).map(i -> i - 1L).orElse(null);
        this.minIndex = config.getMinIndex();
        this.maxIndexExclusive = config.getMaxIndexExclusive();
    }

    @Override
    public void run() {
        if (this.running.compareAndSet(false, true)) {
            Tracker.current.set(this.tracker);
            this.thread.set(Thread.currentThread());
            try {
                while (this.running.get()) {
                    MessageBatch batch = this.fetch(this.lastProcessedIndex);
                    if (batch == null) continue;
                    Tracker.current.set(this.tracker.withMessageBatch(batch));
                    this.processor.accept(batch);
                }
            }
            finally {
                this.thread.set(null);
                Tracker.current.remove();
            }
        }
    }

    protected MessageBatch fetch(Long lastIndex) {
        return (MessageBatch)TimingUtils.retryOnFailure(() -> this.trackingClient.readAndWait(this.tracker.getName(), this.tracker.getTrackerId(), lastIndex, this.tracker.getConfiguration()), (Duration)this.retryDelay, e -> this.running.get());
    }

    protected void process(MessageBatch batch) {
        Long lastIndex = batch.getLastIndex();
        try {
            this.processing = true;
            if (!this.running.get()) {
                return;
            }
            if (batch.getMessages().isEmpty()) {
                this.updatePosition(batch.getLastIndex(), batch.getSegment());
                return;
            }
            if ((batch = this.filterBatchIfNeeded(batch)).getMessages().isEmpty()) {
                this.updatePosition(batch.getLastIndex(), batch.getSegment());
                return;
            }
            this.doProcess(this.consumer, batch);
        }
        finally {
            this.processing = false;
            if (this.isMaxIndexReached(lastIndex)) {
                this.cancel();
            }
        }
    }

    private boolean isMaxIndexReached(Long lastIndex) {
        return this.maxIndexExclusive != null && lastIndex != null && this.maxIndexExclusive <= lastIndex;
    }

    private MessageBatch filterBatchIfNeeded(MessageBatch batch) {
        if (this.shouldFilterBatch(batch)) {
            Predicate<SerializedMessage> filter = this.messageIndexFilter();
            long newLastIndex = this.isMaxIndexReached(batch.getLastIndex()) ? this.maxIndexExclusive - 1L : (this.minIndex == null ? batch.getLastIndex() : Math.max(this.minIndex, batch.getLastIndex()));
            batch = new MessageBatch(batch.getSegment(), batch.getMessages().stream().filter(filter).collect(Collectors.toList()), Long.valueOf(newLastIndex), batch.getPosition());
        }
        return batch;
    }

    private boolean shouldFilterBatch(MessageBatch batch) {
        return this.isMaxIndexReached(batch.getLastIndex()) || this.minIndex != null && !batch.getMessages().isEmpty() && this.minIndex > ((SerializedMessage)batch.getMessages().get(0)).getIndex();
    }

    private Predicate<SerializedMessage> messageIndexFilter() {
        Predicate<SerializedMessage> filter = i -> true;
        if (this.maxIndexExclusive != null) {
            filter = filter.and(i -> i.getIndex() < this.maxIndexExclusive);
        }
        if (this.minIndex != null) {
            filter = filter.and(i -> i.getIndex() >= this.minIndex);
        }
        return filter;
    }

    private void doProcess(Consumer<List<SerializedMessage>> consumer, MessageBatch messageBatch) {
        List messages = messageBatch.getMessages();
        try {
            consumer.accept(messages);
        }
        catch (BatchProcessingException e) {
            log.error("Consumer {} failed to handle batch of {} messages at index {} and did not handle exception. Consumer will be updated to the last processed index and then stopped.", new Object[]{this.tracker.getName(), messages.size(), e.getMessageIndex()});
            this.updatePosition(messages.stream().map(SerializedMessage::getIndex).filter(i -> e.getMessageIndex() != null && i != null && i < e.getMessageIndex()).max(Comparator.naturalOrder()).orElse(null), messageBatch.getSegment());
            this.processing = false;
            this.cancel();
            return;
        }
        catch (Exception e) {
            log.error("Consumer {} failed to handle batch of {} messages and did not handle exception. Tracker will be stopped.", new Object[]{this.tracker.getName(), messages.size(), e});
            this.processing = false;
            this.cancel();
            return;
        }
        this.updatePosition(messageBatch.getLastIndex(), messageBatch.getSegment());
    }

    private void updatePosition(Long index, int[] segment) {
        if (index != null) {
            this.lastProcessedIndex = index;
            TimingUtils.retryOnFailure(() -> {
                try {
                    this.trackingClient.storePosition(this.tracker.getName(), segment, index).get();
                }
                catch (Exception e) {
                    throw new TrackingException(String.format("Failed to store position of segments %s for tracker %s to index %s", Arrays.toString(segment), this.tracker, index), e);
                }
            }, (Duration)this.retryDelay, e2 -> this.running.get());
        }
    }

    public void cancel() {
        if (this.running.compareAndSet(true, false)) {
            if (!Thread.currentThread().equals(this.thread.get())) {
                if (this.processing) {
                    while (this.processing) {
                        try {
                            Thread.sleep(1L);
                        }
                        catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }
                } else {
                    try {
                        this.thread.get().interrupt();
                    }
                    catch (Exception e) {
                        log.warn("Not allowed to cancel tracker {}", (Object)this.tracker.getName(), (Object)e);
                    }
                    finally {
                        this.thread.set(null);
                    }
                }
            }
            this.tracker.getConfiguration().getBatchInterceptors().forEach(i -> {
                try {
                    i.shutdown(this.tracker);
                }
                catch (Exception e) {
                    log.warn("Failed to stop batch interceptor {}", i, (Object)e);
                }
            });
        }
    }
}

