package io.fluxcapacitor.javaclient.tracking.client;

import io.fluxcapacitor.common.Awaitable;
import io.fluxcapacitor.common.ConsistentHashing;
import io.fluxcapacitor.common.Guarantee;
import io.fluxcapacitor.common.MessageType;
import io.fluxcapacitor.common.Registration;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.common.api.tracking.ClaimSegmentResult;
import io.fluxcapacitor.common.api.tracking.MessageBatch;
import io.fluxcapacitor.common.api.tracking.Position;
import io.fluxcapacitor.javaclient.FluxCapacitor;
import io.fluxcapacitor.javaclient.tracking.ConsumerConfiguration;
import io.fluxcapacitor.javaclient.tracking.IndexUtils;
import java.beans.ConstructorProperties;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/fluxcapacitor/javaclient/tracking/client/CachingTrackingClient.class */
public class CachingTrackingClient implements TrackingClient {
    private static final Logger log = LoggerFactory.getLogger(CachingTrackingClient.class);
    private final WebsocketTrackingClient delegate;
    private final int maxCacheSize;
    private final ScheduledExecutorService scheduler;
    private final AtomicBoolean started;
    private volatile Registration registration;
    private final ConcurrentSkipListMap<Long, SerializedMessage> cache;
    private final Map<String, Runnable> waitingTrackers;

    public CachingTrackingClient(WebsocketTrackingClient websocketTrackingClient) {
        this(websocketTrackingClient, 1024);
    }

    @Override // io.fluxcapacitor.javaclient.tracking.client.TrackingClient
    public CompletableFuture<MessageBatch> read(String str, String str2, Long l, ConsumerConfiguration consumerConfiguration) {
        if (this.started.compareAndSet(false, true)) {
            ConsumerConfiguration build = ConsumerConfiguration.builder().ignoreSegment(true).clientControlledIndex(true).minIndex(Long.valueOf(IndexUtils.indexForCurrentTime())).name(CachingTrackingClient.class.getSimpleName()).build();
            this.registration = (Registration) FluxCapacitor.getOptionally().map(fluxCapacitor -> {
                return DefaultTracker.start((Consumer<List<SerializedMessage>>) this::cacheNewMessages, this.delegate.getMessageType(), build, fluxCapacitor);
            }).orElseGet(() -> {
                return DefaultTracker.start(this::cacheNewMessages, build, this.delegate);
            });
        }
        if (l == null || !this.cache.containsKey(l)) {
            return this.delegate.read(str, str2, l, consumerConfiguration);
        }
        Instant plus = Instant.now().plus((TemporalAmount) consumerConfiguration.getMaxWaitDuration());
        return this.delegate.claimSegment(str, str2, l, consumerConfiguration).thenCompose(claimSegmentResult -> {
            Long l2 = (Long) claimSegmentResult.getPosition().lowestIndexForSegment(claimSegmentResult.getSegment()).orElse(null);
            if (l2 == null) {
                return this.delegate.read(str, str2, l, consumerConfiguration);
            }
            CompletableFuture<MessageBatch> completableFuture = new CompletableFuture<>();
            MessageBatch messageBatch = getMessageBatch(consumerConfiguration, l2.longValue(), claimSegmentResult);
            if (messageBatch.isEmpty()) {
                waitForMessages(str, str2, ((Long) Optional.ofNullable(messageBatch.getLastIndex()).orElse(l2)).longValue(), consumerConfiguration, claimSegmentResult, plus, completableFuture);
            } else {
                completableFuture.complete(messageBatch);
            }
            return completableFuture;
        });
    }

    private void waitForMessages(String str, final String str2, long j, final ConsumerConfiguration consumerConfiguration, final ClaimSegmentResult claimSegmentResult, Instant instant, final CompletableFuture<MessageBatch> completableFuture) {
        final AtomicLong atomicLong = new AtomicLong(j);
        long millis = Duration.between(Instant.now(), instant).toMillis();
        if (millis <= 0) {
            completableFuture.complete(new MessageBatch(claimSegmentResult.getSegment(), List.of(), Long.valueOf(atomicLong.get())));
            return;
        }
        final ScheduledFuture<?> schedule = this.scheduler.schedule(() -> {
            try {
                if (completableFuture.complete(new MessageBatch(claimSegmentResult.getSegment(), List.of(), Long.valueOf(atomicLong.get())))) {
                    this.waitingTrackers.remove(str2);
                }
                if (atomicLong.get() > j) {
                    try {
                        storePosition(str, claimSegmentResult.getSegment(), atomicLong.get()).await();
                    } catch (Exception e) {
                        log.error("Failed to update position of {}", str, e);
                    }
                }
            } catch (Throwable th) {
                if (atomicLong.get() > j) {
                    try {
                        storePosition(str, claimSegmentResult.getSegment(), atomicLong.get()).await();
                    } catch (Exception e2) {
                        log.error("Failed to update position of {}", str, e2);
                    }
                }
                throw th;
            }
        }, millis, TimeUnit.MILLISECONDS);
        this.waitingTrackers.put(str2, new Runnable() { // from class: io.fluxcapacitor.javaclient.tracking.client.CachingTrackingClient.1
            @Override // java.lang.Runnable
            public void run() {
                MessageBatch messageBatch = CachingTrackingClient.this.getMessageBatch(consumerConfiguration, atomicLong.get(), claimSegmentResult);
                if (!messageBatch.isEmpty() && completableFuture.complete(messageBatch) && CachingTrackingClient.this.waitingTrackers.remove(str2, this)) {
                    schedule.cancel(false);
                } else {
                    atomicLong.updateAndGet(j2 -> {
                        return ((Long) Optional.ofNullable(messageBatch.getLastIndex()).orElse(Long.valueOf(j2))).longValue();
                    });
                }
            }
        });
    }

    protected MessageBatch getMessageBatch(ConsumerConfiguration consumerConfiguration, long j, ClaimSegmentResult claimSegmentResult) {
        List<SerializedMessage> list = (List) this.cache.tailMap((ConcurrentSkipListMap<Long, SerializedMessage>) Long.valueOf(j), false).values().stream().limit(consumerConfiguration.getMaxFetchSize()).collect(Collectors.toList());
        return new MessageBatch(claimSegmentResult.getSegment(), filterMessages(list, claimSegmentResult.getSegment(), claimSegmentResult.getPosition(), consumerConfiguration), list.isEmpty() ? null : list.get(list.size() - 1).getIndex());
    }

    protected List<SerializedMessage> filterMessages(List<SerializedMessage> list, int[] iArr, Position position, ConsumerConfiguration consumerConfiguration) {
        if (list.isEmpty()) {
            return list;
        }
        Predicate<? super SerializedMessage> predicate = serializedMessage -> {
            return (consumerConfiguration.getTypeFilter() == null || serializedMessage.getData().getType() == null || consumerConfiguration.getTypeFilter().matches(serializedMessage.getData().getType())) && position.isNewMessage(serializedMessage);
        };
        if (!consumerConfiguration.ignoreSegment()) {
            predicate = predicate.and(serializedMessage2 -> {
                return iArr[1] != 0 && serializedMessage2.getSegment().intValue() >= iArr[0] && serializedMessage2.getSegment().intValue() < iArr[1];
            });
        }
        return (List) list.stream().filter(predicate).collect(Collectors.toList());
    }

    protected void cacheNewMessages(List<SerializedMessage> list) {
        if (list.isEmpty()) {
            return;
        }
        this.cache.putAll((Map) list.stream().peek(serializedMessage -> {
            serializedMessage.setSegment(Integer.valueOf(serializedMessage.getSegment() == null ? ConsistentHashing.computeSegment(serializedMessage.getMessageId(), Position.MAX_SEGMENT) : serializedMessage.getSegment().intValue() % Position.MAX_SEGMENT));
        }).collect(Collectors.toMap((v0) -> {
            return v0.getIndex();
        }, Function.identity())));
        this.waitingTrackers.values().forEach((v0) -> {
            v0.run();
        });
        removeOldMessages();
    }

    protected synchronized void removeOldMessages() {
        int size = this.cache.size() - this.maxCacheSize;
        for (int i = 0; i < size; i++) {
            this.cache.pollFirstEntry();
        }
    }

    @Override // io.fluxcapacitor.javaclient.tracking.client.TrackingClient
    public List<SerializedMessage> readFromIndex(long j, int i) {
        return this.delegate.readFromIndex(j, i);
    }

    @Override // io.fluxcapacitor.javaclient.tracking.client.TrackingClient
    public Awaitable storePosition(String str, int[] iArr, long j, Guarantee guarantee) {
        return this.delegate.storePosition(str, iArr, j, guarantee);
    }

    @Override // io.fluxcapacitor.javaclient.tracking.client.TrackingClient
    public Awaitable resetPosition(String str, long j, Guarantee guarantee) {
        return this.delegate.resetPosition(str, j, guarantee);
    }

    @Override // io.fluxcapacitor.javaclient.tracking.client.TrackingClient
    public Position getPosition(String str) {
        return this.delegate.getPosition(str);
    }

    @Override // io.fluxcapacitor.javaclient.tracking.client.TrackingClient
    public Awaitable disconnectTracker(String str, String str2, boolean z, Guarantee guarantee) {
        return this.delegate.disconnectTracker(str, str2, z, guarantee);
    }

    @Override // io.fluxcapacitor.javaclient.tracking.client.TrackingClient
    public MessageType getMessageType() {
        return this.delegate.getMessageType();
    }

    @Override // io.fluxcapacitor.javaclient.tracking.client.TrackingClient, java.lang.AutoCloseable
    public void close() {
        Optional.ofNullable(this.registration).ifPresent((v0) -> {
            v0.cancel();
        });
        this.scheduler.shutdown();
        this.delegate.close();
    }

    @ConstructorProperties({"delegate", "maxCacheSize"})
    public CachingTrackingClient(WebsocketTrackingClient websocketTrackingClient, int i) {
        this.scheduler = Executors.newScheduledThreadPool(10);
        this.started = new AtomicBoolean();
        this.cache = new ConcurrentSkipListMap<>();
        this.waitingTrackers = new ConcurrentHashMap();
        this.delegate = websocketTrackingClient;
        this.maxCacheSize = i;
    }

    public WebsocketTrackingClient getDelegate() {
        return this.delegate;
    }
}
