/*
 * Decompiled with CFR 0.152.
 */
package io.fluxcapacitor.javaclient.tracking.client;

import io.fluxcapacitor.common.Awaitable;
import io.fluxcapacitor.common.Guarantee;
import io.fluxcapacitor.common.MessageType;
import io.fluxcapacitor.common.Registration;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.common.api.tracking.MessageBatch;
import io.fluxcapacitor.common.api.tracking.Position;
import io.fluxcapacitor.javaclient.FluxCapacitor;
import io.fluxcapacitor.javaclient.publishing.client.GatewayClient;
import io.fluxcapacitor.javaclient.tracking.ConsumerConfiguration;
import io.fluxcapacitor.javaclient.tracking.IndexUtils;
import io.fluxcapacitor.javaclient.tracking.client.SimpleTrackerRead;
import io.fluxcapacitor.javaclient.tracking.client.TrackerRead;
import io.fluxcapacitor.javaclient.tracking.client.TrackingClient;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InMemoryMessageStore
implements GatewayClient,
TrackingClient {
    private static final Logger log = LoggerFactory.getLogger(InMemoryMessageStore.class);
    private final ExecutorService executor = Executors.newCachedThreadPool();
    private final AtomicLong nextIndex = new AtomicLong();
    private final Map<String, TrackerRead> trackers = new ConcurrentHashMap<String, TrackerRead>();
    private final List<Consumer<SerializedMessage>> monitors = new CopyOnWriteArrayList<Consumer<SerializedMessage>>();
    private final ConcurrentSkipListMap<Long, SerializedMessage> messageLog = new ConcurrentSkipListMap();
    private final Map<String, Long> consumerTokens = new ConcurrentHashMap<String, Long>();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Awaitable send(Guarantee guarantee, SerializedMessage ... messages) {
        Arrays.stream(messages).forEach(m -> {
            if (m.getIndex() == null) {
                m.setIndex(Long.valueOf(this.nextIndex.updateAndGet(i -> i <= 0L ? IndexUtils.indexFromMillis(FluxCapacitor.currentClock().millis()) : i + 1L)));
            }
            this.messageLog.put(m.getIndex(), (SerializedMessage)m);
            this.monitors.forEach(monitor -> monitor.accept(m));
        });
        InMemoryMessageStore inMemoryMessageStore = this;
        synchronized (inMemoryMessageStore) {
            this.notifyAll();
        }
        return Awaitable.ready();
    }

    @Override
    public CompletableFuture<MessageBatch> read(String consumer, String trackerId, Long lastIndex, ConsumerConfiguration configuration) {
        return this.read(new SimpleTrackerRead(consumer, trackerId, lastIndex, configuration));
    }

    public CompletableFuture<int[]> claimSegment(TrackerRead trackerRead) {
        if (trackerRead.getMessageType() != MessageType.RESULT && !Objects.equals(trackerRead.getTrackerId(), this.trackers.computeIfAbsent(trackerRead.getConsumerName(), c -> trackerRead).getTrackerId())) {
            return CompletableFuture.supplyAsync(() -> new int[]{0, 0}, CompletableFuture.delayedExecutor(trackerRead.getDeadline() - System.currentTimeMillis(), TimeUnit.MILLISECONDS));
        }
        return CompletableFuture.completedFuture(new int[]{0, Position.MAX_SEGMENT});
    }

    public CompletableFuture<MessageBatch> read(TrackerRead trackerRead) {
        if (trackerRead.getMessageType() != MessageType.RESULT && !Objects.equals(trackerRead.getTrackerId(), this.trackers.computeIfAbsent(trackerRead.getConsumerName(), c -> trackerRead).getTrackerId())) {
            return CompletableFuture.supplyAsync(() -> new MessageBatch(new int[]{0, 0}, Collections.emptyList(), null), CompletableFuture.delayedExecutor(trackerRead.getDeadline() - System.currentTimeMillis(), TimeUnit.MILLISECONDS));
        }
        CompletableFuture<MessageBatch> result = new CompletableFuture<MessageBatch>();
        this.executor.execute(() -> {
            InMemoryMessageStore inMemoryMessageStore = this;
            synchronized (inMemoryMessageStore) {
                NavigableMap tailMap = Collections.emptyMap();
                while (System.currentTimeMillis() < trackerRead.getDeadline() && this.shouldWait(tailMap = this.messageLog.tailMap((Object)Optional.ofNullable(trackerRead.getLastTrackerIndex()).orElseGet(() -> this.getLastIndex(trackerRead.getConsumerName())), false))) {
                    try {
                        this.wait(trackerRead.getDeadline() - System.currentTimeMillis());
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
                List<Object> messages = this.filterMessages(new ArrayList<SerializedMessage>(tailMap.values()));
                Long lastIndex = (messages = messages.subList(0, Math.min(messages.size(), trackerRead.getMaxSize()))).isEmpty() ? null : messages.get(messages.size() - 1).getIndex();
                messages = messages.stream().filter(trackerRead::canHandle).collect(Collectors.toList());
                result.complete(new MessageBatch(new int[]{0, 128}, messages, lastIndex));
            }
        });
        return result;
    }

    protected boolean shouldWait(Map<Long, SerializedMessage> tailMap) {
        return this.filterMessages(tailMap.values()).isEmpty();
    }

    protected List<SerializedMessage> filterMessages(Collection<SerializedMessage> messages) {
        if (messages.isEmpty()) {
            return Collections.emptyList();
        }
        return messages instanceof List ? (List<Object>)messages : new ArrayList<SerializedMessage>(messages);
    }

    @Override
    public List<SerializedMessage> readFromIndex(long minIndex, int maxSize) {
        ArrayList list = new ArrayList(this.messageLog.tailMap((Object)minIndex).values());
        return list.subList(0, Math.min(maxSize, list.size()));
    }

    private long getLastIndex(String consumer) {
        return this.consumerTokens.computeIfAbsent(consumer, k -> -1L);
    }

    @Override
    public Awaitable storePosition(String consumer, int[] segment, long lastIndex) {
        return this.resetPosition(consumer, lastIndex);
    }

    @Override
    public Awaitable resetPosition(String consumer, long lastIndex) {
        this.consumerTokens.put(consumer, lastIndex);
        return Awaitable.ready();
    }

    @Override
    public Position getPosition(String consumer) {
        return Optional.ofNullable(this.consumerTokens.get(consumer)).map(Position::new).orElse(null);
    }

    @Override
    public Awaitable disconnectTracker(String consumer, String trackerId, boolean sendFinalEmptyBatch) {
        this.disconnectTrackersMatching(t -> Objects.equals(trackerId, t.getTrackerId()));
        return Awaitable.ready();
    }

    public <T extends TrackerRead> void disconnectTrackersMatching(Predicate<T> predicate) {
        this.trackers.values().removeIf(t -> predicate.test(t));
    }

    public Registration registerMonitor(Consumer<SerializedMessage> monitor) {
        this.monitors.add(monitor);
        return () -> this.monitors.remove(monitor);
    }

    @Override
    public void close() {
        this.executor.shutdown();
    }

    protected SerializedMessage getMessage(long index) {
        return this.messageLog.get(index);
    }
}

