/*
 * Decompiled with CFR 0.152.
 */
package io.fluxcapacitor.javaclient.tracking.client;

import io.fluxcapacitor.common.Awaitable;
import io.fluxcapacitor.common.Registration;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.common.api.tracking.MessageBatch;
import io.fluxcapacitor.javaclient.publishing.client.GatewayClient;
import io.fluxcapacitor.javaclient.tracking.TrackingConfiguration;
import io.fluxcapacitor.javaclient.tracking.client.TrackingClient;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InMemoryMessageStore
implements GatewayClient,
TrackingClient {
    private static final Logger log = LoggerFactory.getLogger(InMemoryMessageStore.class);
    private final AtomicLong nextIndex = new AtomicLong();
    private final ConcurrentSkipListMap<Long, SerializedMessage> messageLog = new ConcurrentSkipListMap();
    private final Map<String, String> trackers = new ConcurrentHashMap<String, String>();
    private final Map<String, Long> consumerTokens = new ConcurrentHashMap<String, Long>();
    private final List<Consumer<SerializedMessage>> monitors = new CopyOnWriteArrayList<Consumer<SerializedMessage>>();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Awaitable send(SerializedMessage ... messages) {
        Arrays.stream(messages).forEach(m -> {
            if (m.getIndex() == null) {
                m.setIndex(Long.valueOf(this.nextIndex.getAndIncrement()));
            }
            this.messageLog.put(m.getIndex(), (SerializedMessage)m);
            this.monitors.forEach(monitor -> monitor.accept(m));
        });
        InMemoryMessageStore inMemoryMessageStore = this;
        synchronized (inMemoryMessageStore) {
            this.notifyAll();
        }
        return Awaitable.ready();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public MessageBatch readAndWait(String consumer, String trackerId, Long previousLastIndex, TrackingConfiguration configuration) {
        if (!trackerId.equals(this.trackers.computeIfAbsent(consumer, c -> trackerId))) {
            return new MessageBatch(new int[]{0, 1}, Collections.emptyList(), null);
        }
        long deadline = System.currentTimeMillis() + configuration.getMaxWaitDuration().toMillis();
        InMemoryMessageStore inMemoryMessageStore = this;
        synchronized (inMemoryMessageStore) {
            Long lastIndex;
            NavigableMap tailMap = Collections.emptyMap();
            while (System.currentTimeMillis() < deadline && (tailMap = this.messageLog.tailMap((Object)Optional.ofNullable(previousLastIndex).orElseGet(() -> this.getLastIndex(consumer)), false)).isEmpty()) {
                try {
                    this.wait(deadline - System.currentTimeMillis());
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return new MessageBatch(new int[]{0, 1}, Collections.emptyList(), null);
                }
            }
            List<Object> messages = new ArrayList(tailMap.values());
            Long l = lastIndex = messages.isEmpty() ? null : ((SerializedMessage)messages.get(messages.size() - 1)).getIndex();
            if (configuration.getTypeFilter() != null) {
                messages = messages.stream().filter(m -> m.getData().getType().matches(configuration.getTypeFilter())).collect(Collectors.toList());
            }
            return new MessageBatch(new int[]{0, 1}, messages, lastIndex);
        }
    }

    @Override
    public CompletableFuture<MessageBatch> read(String consumer, String trackerId, Long lastIndex, TrackingConfiguration trackingConfiguration) {
        return CompletableFuture.completedFuture(this.readAndWait(consumer, trackerId, lastIndex, trackingConfiguration));
    }

    @Override
    public List<SerializedMessage> readFromIndex(long minIndex, int maxSize) {
        ArrayList list = new ArrayList(this.messageLog.tailMap((Object)minIndex).values());
        return list.subList(0, Math.min(maxSize, list.size()));
    }

    private long getLastIndex(String consumer) {
        return this.consumerTokens.computeIfAbsent(consumer, k -> -1L);
    }

    @Override
    public Awaitable storePosition(String consumer, int[] segment, long lastIndex) {
        return this.resetPosition(consumer, lastIndex);
    }

    @Override
    public Awaitable resetPosition(String consumer, long lastIndex) {
        this.consumerTokens.put(consumer, lastIndex);
        return Awaitable.ready();
    }

    @Override
    public Awaitable disconnectTracker(String consumer, String trackerId, boolean sendFinalEmptyBatch) {
        this.trackers.remove(consumer, trackerId);
        return Awaitable.ready();
    }

    public Registration registerMonitor(Consumer<SerializedMessage> monitor) {
        this.monitors.add(monitor);
        return () -> this.monitors.remove(monitor);
    }

    @Override
    public void close() {
    }
}

