/*
 * Decompiled with CFR 0.152.
 */
package io.fluxcapacitor.javaclient.tracking.client;

import io.fluxcapacitor.common.Registration;
import io.fluxcapacitor.common.TimingUtils;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.common.api.tracking.MessageBatch;
import io.fluxcapacitor.javaclient.configuration.client.Client;
import io.fluxcapacitor.javaclient.tracking.BatchInterceptor;
import io.fluxcapacitor.javaclient.tracking.BatchProcessingException;
import io.fluxcapacitor.javaclient.tracking.ConsumerConfiguration;
import io.fluxcapacitor.javaclient.tracking.Tracker;
import io.fluxcapacitor.javaclient.tracking.TrackingException;
import io.fluxcapacitor.javaclient.tracking.client.TrackingClient;
import java.time.Duration;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultTracker
implements Runnable,
Registration {
    private static final Logger log = LoggerFactory.getLogger(DefaultTracker.class);
    private final Tracker tracker;
    private final Consumer<MessageBatch> processor;
    private final TrackingClient trackingClient;
    private final AtomicBoolean running = new AtomicBoolean();
    private final AtomicReference<Thread> thread = new AtomicReference();
    private final Duration retryDelay;
    private volatile Long lastProcessedIndex;
    private volatile boolean processing;

    public static Registration start(Consumer<List<SerializedMessage>> consumer, ConsumerConfiguration config, Client client) {
        List<DefaultTracker> instances = IntStream.range(0, config.getThreads()).mapToObj(i -> new DefaultTracker(consumer, config, client)).collect(Collectors.toList());
        ExecutorService executor = Executors.newFixedThreadPool(config.getThreads());
        instances.forEach(executor::execute);
        return () -> {
            instances.forEach(DefaultTracker::cancel);
            executor.shutdownNow();
        };
    }

    private DefaultTracker(Consumer<List<SerializedMessage>> consumer, ConsumerConfiguration config, Client client) {
        this.tracker = new Tracker(config.prependApplicationName() ? String.format("%s_%s", client.name(), config.getName()) : config.getName(), config.getTrackerIdFactory().apply(client), config, null);
        this.processor = BatchInterceptor.join(config.getBatchInterceptors()).intercept(b -> this.process((MessageBatch)b, consumer), this.tracker);
        this.trackingClient = client.getTrackingClient(config.getMessageType());
        this.retryDelay = Duration.ofSeconds(1L);
        this.lastProcessedIndex = config.getLastIndex();
    }

    @Override
    public void run() {
        if (this.running.compareAndSet(false, true)) {
            Tracker.current.set(this.tracker);
            this.thread.set(Thread.currentThread());
            while (this.running.get()) {
                MessageBatch batch = this.fetch(this.lastProcessedIndex);
                if (batch == null) continue;
                Tracker.current.set(this.tracker.withMessageBatch(batch));
                this.processor.accept(batch);
            }
            Tracker.current.remove();
        }
    }

    protected MessageBatch fetch(Long lastIndex) {
        return (MessageBatch)TimingUtils.retryOnFailure(() -> this.trackingClient.readAndWait(this.tracker.getName(), this.tracker.getTrackerId(), lastIndex, this.tracker.getConfiguration()), (Duration)this.retryDelay, e -> this.running.get());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     */
    protected void process(MessageBatch messageBatch, Consumer<List<SerializedMessage>> consumer) {
        block10: {
            try {
                this.processing = true;
                List messages = messageBatch.getMessages();
                if (messages.isEmpty() || !this.running.get()) {
                    return;
                }
                try {
                    consumer.accept(messages);
                }
                catch (BatchProcessingException e) {
                    log.error("Consumer {} failed to handle batch of {} messages at index {} and did not handle exception. Consumer will be updated to the last processed index and then stopped.", new Object[]{this.tracker.getName(), messages.size(), e.getMessageIndex()});
                    this.updatePosition(messages.stream().map(SerializedMessage::getIndex).filter(i -> e.getMessageIndex() != null && i != null && i < e.getMessageIndex()).max(Comparator.naturalOrder()).orElse(null), messageBatch.getSegment());
                    this.processing = false;
                    this.cancel();
                    this.processing = false;
                    return;
                }
                catch (Exception e) {
                    log.error("Consumer {} failed to handle batch of {} messages and did not handle exception. Tracker will be stopped.", new Object[]{this.tracker.getName(), messages.size(), e});
                    this.processing = false;
                    this.cancel();
                    this.processing = false;
                    return;
                }
                this.updatePosition(messageBatch.getLastIndex(), messageBatch.getSegment());
                break block10;
                {
                    catch (Throwable throwable) {
                        throw throwable;
                    }
                }
            }
            finally {
                this.processing = false;
            }
        }
    }

    private void updatePosition(Long index, int[] segment) {
        if (index != null) {
            this.lastProcessedIndex = index;
            TimingUtils.retryOnFailure(() -> {
                try {
                    this.trackingClient.storePosition(this.tracker.getName(), segment, index).await();
                }
                catch (Exception e) {
                    throw new TrackingException(String.format("Failed to store position of segments %s for tracker %s to index %s", Arrays.toString(segment), this.tracker, index), e);
                }
            }, (Duration)this.retryDelay, e2 -> this.running.get());
        }
    }

    public void cancel() {
        if (this.running.compareAndSet(true, false)) {
            if (this.processing) {
                while (this.processing) {
                    try {
                        Thread.sleep(1L);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        return;
                    }
                }
            } else {
                try {
                    this.thread.get().interrupt();
                }
                catch (Exception e) {
                    log.warn("Not allowed to cancel tracker {}", (Object)this.tracker.getName(), (Object)e);
                }
                finally {
                    this.thread.set(null);
                }
            }
            this.tracker.getConfiguration().getBatchInterceptors().forEach(i -> {
                try {
                    i.shutdown(this.tracker);
                }
                catch (Exception e) {
                    log.warn("Failed to stop batch interceptor {}", i, (Object)e);
                }
            });
        }
    }
}

