package io.fluxcapacitor.javaclient.tracking.client;

import io.fluxcapacitor.common.Guarantee;
import io.fluxcapacitor.common.MessageType;
import io.fluxcapacitor.common.ObjectUtils;
import io.fluxcapacitor.common.Registration;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.common.api.tracking.MessageBatch;
import io.fluxcapacitor.common.api.tracking.Position;
import io.fluxcapacitor.javaclient.FluxCapacitor;
import io.fluxcapacitor.javaclient.publishing.client.GatewayClient;
import io.fluxcapacitor.javaclient.publishing.client.MessageDispatch;
import io.fluxcapacitor.javaclient.tracking.ConsumerConfiguration;
import io.fluxcapacitor.javaclient.tracking.IndexUtils;
import java.beans.ConstructorProperties;
import java.time.Duration;
import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/fluxcapacitor/javaclient/tracking/client/InMemoryMessageStore.class */
public class InMemoryMessageStore implements GatewayClient, TrackingClient {
    private static final Logger log = LoggerFactory.getLogger(InMemoryMessageStore.class);
    private final Set<Consumer<MessageDispatch>> monitors;
    private final ExecutorService executor;
    private final AtomicLong nextIndex;
    private final Map<String, TrackerRead> trackers;
    private final ConcurrentSkipListMap<Long, SerializedMessage> messageLog;
    private final Map<String, Long> consumerTokens;
    private final MessageType messageType;
    private final Duration messageExpiration;

    public InMemoryMessageStore(MessageType messageType) {
        this(messageType, Duration.ofMinutes(2L));
    }

    @Override // io.fluxcapacitor.javaclient.publishing.client.GatewayClient
    public CompletableFuture<Void> send(Guarantee guarantee, SerializedMessage... serializedMessageArr) {
        try {
            synchronized (this) {
                Arrays.stream(serializedMessageArr).forEach(serializedMessage -> {
                    if (serializedMessage.getIndex() == null) {
                        serializedMessage.setIndex(Long.valueOf(this.nextIndex.updateAndGet((v0) -> {
                            return IndexUtils.nextIndex(v0);
                        })));
                    }
                    this.messageLog.put(serializedMessage.getIndex(), serializedMessage);
                });
                if (this.messageExpiration != null) {
                    purgeExpiredMessages(this.messageExpiration);
                }
                notifyAll();
            }
            CompletableFuture<Void> completedFuture = CompletableFuture.completedFuture(null);
            if (!this.monitors.isEmpty()) {
                MessageDispatch messageDispatch = new MessageDispatch(Arrays.asList(serializedMessageArr), this.messageType);
                this.monitors.forEach(consumer -> {
                    consumer.accept(messageDispatch);
                });
            }
            return completedFuture;
        } catch (Throwable th) {
            if (!this.monitors.isEmpty()) {
                MessageDispatch messageDispatch2 = new MessageDispatch(Arrays.asList(serializedMessageArr), this.messageType);
                this.monitors.forEach(consumer2 -> {
                    consumer2.accept(messageDispatch2);
                });
            }
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void purgeExpiredMessages(Duration duration) {
        this.messageLog.headMap((ConcurrentSkipListMap<Long, SerializedMessage>) Long.valueOf(IndexUtils.indexFromMillis(FluxCapacitor.currentTime().minus((TemporalAmount) duration).toEpochMilli()))).clear();
    }

    @Override // io.fluxcapacitor.javaclient.tracking.client.TrackingClient
    public CompletableFuture<MessageBatch> read(String str, String str2, Long l, ConsumerConfiguration consumerConfiguration) {
        return read(new SimpleTrackerRead(str, str2, l, consumerConfiguration, this.messageType));
    }

    public CompletableFuture<int[]> claimSegment(TrackerRead trackerRead) {
        return (trackerRead.getMessageType() == MessageType.RESULT || Objects.equals(trackerRead.getTrackerId(), this.trackers.computeIfAbsent(trackerRead.getConsumer(), str -> {
            return trackerRead;
        }).getTrackerId())) ? CompletableFuture.completedFuture(new int[]{0, Position.MAX_SEGMENT}) : CompletableFuture.supplyAsync(() -> {
            return new int[]{0, 0};
        }, CompletableFuture.delayedExecutor(trackerRead.getDeadline() - System.currentTimeMillis(), TimeUnit.MILLISECONDS));
    }

    public CompletableFuture<MessageBatch> read(TrackerRead trackerRead) {
        if (trackerRead.getMessageType() != MessageType.RESULT && !Objects.equals(trackerRead.getTrackerId(), this.trackers.computeIfAbsent(trackerRead.getConsumer(), str -> {
            return trackerRead;
        }).getTrackerId())) {
            log.debug("Delaying read by secondary tracker {} (message type {})", trackerRead.getConsumer(), this.messageType);
            return CompletableFuture.supplyAsync(() -> {
                return new MessageBatch(new int[]{0, 0}, Collections.emptyList(), (Long) null);
            }, CompletableFuture.delayedExecutor(trackerRead.getDeadline() - System.currentTimeMillis(), TimeUnit.MILLISECONDS));
        }
        CompletableFuture<MessageBatch> completableFuture = new CompletableFuture<>();
        this.executor.execute(() -> {
            Map<Long, SerializedMessage> emptyMap = Collections.emptyMap();
            synchronized (this) {
                while (System.currentTimeMillis() < trackerRead.getDeadline()) {
                    ConcurrentNavigableMap<Long, SerializedMessage> tailMap = this.messageLog.tailMap((ConcurrentSkipListMap<Long, SerializedMessage>) Optional.ofNullable(trackerRead.getLastIndex()).orElseGet(() -> {
                        return Long.valueOf(getLastIndex(trackerRead.getConsumer()));
                    }), false);
                    emptyMap = tailMap;
                    if (!shouldWait(tailMap)) {
                        break;
                    }
                    long deadline = trackerRead.getDeadline() - System.currentTimeMillis();
                    if (deadline > 0) {
                        try {
                            wait(deadline);
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }
                }
            }
            ArrayList arrayList = new ArrayList(filterMessages(emptyMap.values()));
            List subList = arrayList.subList(0, Math.min(arrayList.size(), trackerRead.getMaxSize()));
            Long index = subList.isEmpty() ? null : ((SerializedMessage) subList.get(subList.size() - 1)).getIndex();
            Stream stream = subList.stream();
            Objects.requireNonNull(trackerRead);
            completableFuture.complete(new MessageBatch(new int[]{0, 128}, (List) stream.filter(trackerRead::canHandle).collect(Collectors.toList()), index));
        });
        return completableFuture;
    }

    protected boolean shouldWait(Map<Long, SerializedMessage> map) {
        return filterMessages(map.values()).isEmpty();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Collection<SerializedMessage> filterMessages(Collection<SerializedMessage> collection) {
        return collection;
    }

    @Override // io.fluxcapacitor.javaclient.tracking.client.TrackingClient
    public List<SerializedMessage> readFromIndex(long j, int i) {
        ArrayList arrayList = new ArrayList(this.messageLog.tailMap((ConcurrentSkipListMap<Long, SerializedMessage>) Long.valueOf(j)).values());
        return arrayList.subList(0, Math.min(i, arrayList.size()));
    }

    private long getLastIndex(String str) {
        return this.consumerTokens.computeIfAbsent(str, str2 -> {
            return -1L;
        }).longValue();
    }

    @Override // io.fluxcapacitor.javaclient.tracking.client.TrackingClient
    public CompletableFuture<Void> storePosition(String str, int[] iArr, long j, Guarantee guarantee) {
        return resetPosition(str, j, guarantee);
    }

    @Override // io.fluxcapacitor.javaclient.tracking.client.TrackingClient
    public CompletableFuture<Void> resetPosition(String str, long j, Guarantee guarantee) {
        this.consumerTokens.put(str, Long.valueOf(j));
        return CompletableFuture.completedFuture(null);
    }

    @Override // io.fluxcapacitor.javaclient.tracking.client.TrackingClient
    public Position getPosition(String str) {
        return (Position) Optional.ofNullable(this.consumerTokens.get(str)).map((v1) -> {
            return new Position(v1);
        }).orElse(null);
    }

    @Override // io.fluxcapacitor.javaclient.tracking.client.TrackingClient
    public CompletableFuture<Void> disconnectTracker(String str, String str2, boolean z, Guarantee guarantee) {
        disconnectTrackersMatching(trackerRead -> {
            return Objects.equals(str2, trackerRead.getTrackerId());
        });
        return CompletableFuture.completedFuture(null);
    }

    public <T extends TrackerRead> void disconnectTrackersMatching(Predicate<T> predicate) {
        this.trackers.values().removeIf(trackerRead -> {
            return predicate.test(trackerRead);
        });
    }

    @Override // io.fluxcapacitor.javaclient.publishing.client.GatewayClient, java.lang.AutoCloseable
    public void close() {
        this.executor.shutdownNow();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SerializedMessage getMessage(long j) {
        return this.messageLog.get(Long.valueOf(j));
    }

    public Registration registerMonitor(Consumer<MessageDispatch> consumer) {
        this.monitors.add(consumer);
        return () -> {
            this.monitors.remove(consumer);
        };
    }

    @ConstructorProperties({"messageType", "messageExpiration"})
    public InMemoryMessageStore(MessageType messageType, Duration duration) {
        this.monitors = new CopyOnWriteArraySet();
        this.executor = Executors.newCachedThreadPool(ObjectUtils.newThreadFactory("InMemoryMessageStore"));
        this.nextIndex = new AtomicLong();
        this.trackers = new ConcurrentHashMap();
        this.messageLog = new ConcurrentSkipListMap<>();
        this.consumerTokens = new ConcurrentHashMap();
        this.messageType = messageType;
        this.messageExpiration = duration;
    }

    @Override // io.fluxcapacitor.javaclient.tracking.client.TrackingClient
    public MessageType getMessageType() {
        return this.messageType;
    }
}
