package io.fluxcapacitor.javaclient.tracking.client;

import io.fluxcapacitor.common.Awaitable;
import io.fluxcapacitor.common.Registration;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.common.api.tracking.MessageBatch;
import io.fluxcapacitor.common.api.tracking.TrackingStrategy;
import io.fluxcapacitor.javaclient.publishing.client.GatewayClient;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;

/* loaded from: input_file:io/fluxcapacitor/javaclient/tracking/client/InMemoryMessageStore.class */
public class InMemoryMessageStore implements GatewayClient, TrackingClient {
    private final AtomicLong nextIndex = new AtomicLong();
    private final ConcurrentSkipListMap<Long, SerializedMessage> messageLog = new ConcurrentSkipListMap<>();
    private final Map<String, Long> consumerTokens = new ConcurrentHashMap();
    private final List<Consumer<SerializedMessage>> monitors = new CopyOnWriteArrayList();

    @Override // io.fluxcapacitor.javaclient.publishing.client.GatewayClient
    public Awaitable send(SerializedMessage... serializedMessageArr) {
        Arrays.stream(serializedMessageArr).forEach(serializedMessage -> {
            if (serializedMessage.getIndex() == null) {
                serializedMessage.setIndex(Long.valueOf(this.nextIndex.getAndIncrement()));
            }
            this.messageLog.put(serializedMessage.getIndex(), serializedMessage);
            this.monitors.forEach(consumer -> {
                consumer.accept(serializedMessage);
            });
        });
        synchronized (this) {
            notifyAll();
        }
        return Awaitable.ready();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v23, types: [java.util.List] */
    public MessageBatch readAndWait(String str, int i, int i2, Duration duration, String str2, boolean z, TrackingStrategy trackingStrategy) {
        MessageBatch messageBatch;
        if (i != 0) {
            return new MessageBatch(new int[]{0, 1}, Collections.emptyList(), (Long) null);
        }
        long currentTimeMillis = System.currentTimeMillis() + duration.toMillis();
        synchronized (this) {
            Map emptyMap = Collections.emptyMap();
            while (System.currentTimeMillis() < currentTimeMillis) {
                ConcurrentNavigableMap<Long, SerializedMessage> tailMap = this.messageLog.tailMap((ConcurrentSkipListMap<Long, SerializedMessage>) Long.valueOf(getLastIndex(str)), false);
                emptyMap = tailMap;
                if (!tailMap.isEmpty()) {
                    break;
                }
                try {
                    wait(currentTimeMillis - System.currentTimeMillis());
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return new MessageBatch(new int[]{0, 1}, Collections.emptyList(), (Long) null);
                }
            }
            ArrayList arrayList = new ArrayList(emptyMap.values());
            Long index = arrayList.isEmpty() ? null : ((SerializedMessage) arrayList.get(arrayList.size() - 1)).getIndex();
            if (str2 != null) {
                arrayList = (List) arrayList.stream().filter(serializedMessage -> {
                    return serializedMessage.getData().getType().matches(str2);
                }).collect(Collectors.toList());
            }
            messageBatch = new MessageBatch(new int[]{0, 1}, arrayList, index);
        }
        return messageBatch;
    }

    @Override // io.fluxcapacitor.javaclient.tracking.client.TrackingClient
    public CompletableFuture<MessageBatch> read(String str, int i, int i2, Duration duration, String str2, boolean z, TrackingStrategy trackingStrategy) {
        return CompletableFuture.completedFuture(readAndWait(str, i, i2, duration, str2, z, trackingStrategy));
    }

    @Override // io.fluxcapacitor.javaclient.tracking.client.TrackingClient
    public List<SerializedMessage> readFromIndex(long j) {
        return new ArrayList(this.messageLog.tailMap((ConcurrentSkipListMap<Long, SerializedMessage>) Long.valueOf(j)).values());
    }

    private long getLastIndex(String str) {
        return this.consumerTokens.computeIfAbsent(str, str2 -> {
            return -1L;
        }).longValue();
    }

    public Awaitable storePosition(String str, int[] iArr, long j) {
        return resetPosition(str, j);
    }

    @Override // io.fluxcapacitor.javaclient.tracking.client.TrackingClient
    public Awaitable resetPosition(String str, long j) {
        this.consumerTokens.put(str, Long.valueOf(j));
        return Awaitable.ready();
    }

    @Override // io.fluxcapacitor.javaclient.tracking.client.TrackingClient
    public Awaitable disconnectTracker(String str, int i) {
        return Awaitable.ready();
    }

    public Registration registerMonitor(Consumer<SerializedMessage> consumer) {
        this.monitors.add(consumer);
        return () -> {
            this.monitors.remove(consumer);
        };
    }

    @Override // io.fluxcapacitor.javaclient.publishing.client.GatewayClient, java.lang.AutoCloseable
    public void close() {
    }
}
