package io.fluxcapacitor.javaclient.tracking;

import io.fluxcapacitor.common.Awaitable;
import io.fluxcapacitor.common.Registration;
import io.fluxcapacitor.common.api.Message;
import io.fluxcapacitor.common.api.tracking.MessageBatch;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

/* loaded from: input_file:io/fluxcapacitor/javaclient/tracking/InMemoryMessageStore.class */
public class InMemoryMessageStore implements ProducerService, ConsumerService {
    private final AtomicLong nextIndex = new AtomicLong();
    private final ConcurrentSkipListMap<Long, Message> messageLog = new ConcurrentSkipListMap<>();
    private final Map<String, Long> consumerTokens = new ConcurrentHashMap();
    private final List<Consumer<Message>> monitors = new CopyOnWriteArrayList();

    @Override // io.fluxcapacitor.javaclient.tracking.ProducerService
    public Awaitable send(Message... messageArr) {
        Arrays.stream(messageArr).forEach(message -> {
            long andIncrement = this.nextIndex.getAndIncrement();
            message.setIndex(Long.valueOf(andIncrement));
            this.messageLog.put(Long.valueOf(andIncrement), message);
        });
        synchronized (this) {
            notifyAll();
        }
        return () -> {
        };
    }

    @Override // io.fluxcapacitor.javaclient.tracking.ConsumerService
    public MessageBatch read(String str, int i, int i2, Duration duration) {
        MessageBatch messageBatch;
        long currentTimeMillis = System.currentTimeMillis() + duration.toMillis();
        synchronized (this) {
            Map emptyMap = Collections.emptyMap();
            while (System.currentTimeMillis() < currentTimeMillis) {
                ConcurrentNavigableMap<Long, Message> tailMap = this.messageLog.tailMap((ConcurrentSkipListMap<Long, Message>) Long.valueOf(getLastToken(str)), false);
                emptyMap = tailMap;
                if (!tailMap.isEmpty()) {
                    break;
                }
                try {
                    wait(currentTimeMillis - System.currentTimeMillis());
                } catch (InterruptedException e) {
                    Thread.interrupted();
                    return new MessageBatch(new int[]{0, 1}, Collections.emptyList(), (Long) null);
                }
            }
            ArrayList arrayList = new ArrayList(emptyMap.values());
            messageBatch = new MessageBatch(new int[]{0, 1}, arrayList, arrayList.isEmpty() ? null : ((Message) arrayList.get(arrayList.size() - 1)).getIndex());
        }
        return messageBatch;
    }

    private long getLastToken(String str) {
        return this.consumerTokens.computeIfAbsent(str, str2 -> {
            return -1L;
        }).longValue();
    }

    @Override // io.fluxcapacitor.javaclient.tracking.ConsumerService
    public void storePosition(String str, int[] iArr, long j) {
        this.consumerTokens.put(str, Long.valueOf(j));
    }

    public Registration registerMonitor(Consumer<Message> consumer) {
        this.monitors.add(consumer);
        return () -> {
            return this.monitors.remove(consumer);
        };
    }
}
