package io.fluxcapacitor.javaclient.scheduling.client;

import io.fluxcapacitor.common.Awaitable;
import io.fluxcapacitor.common.Guarantee;
import io.fluxcapacitor.common.MessageType;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.common.api.scheduling.SerializedSchedule;
import io.fluxcapacitor.javaclient.common.serialization.Serializer;
import io.fluxcapacitor.javaclient.scheduling.Schedule;
import io.fluxcapacitor.javaclient.tracking.IndexUtils;
import io.fluxcapacitor.javaclient.tracking.client.InMemoryMessageStore;
import java.time.Clock;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/fluxcapacitor/javaclient/scheduling/client/InMemorySchedulingClient.class */
public class InMemorySchedulingClient extends InMemoryMessageStore implements SchedulingClient {
    private static final Logger log = LoggerFactory.getLogger(InMemorySchedulingClient.class);
    private final ConcurrentSkipListMap<Long, String> scheduleIdsByIndex = new ConcurrentSkipListMap<>();
    private volatile Clock clock = Clock.systemUTC();

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.fluxcapacitor.javaclient.tracking.client.InMemoryMessageStore
    public List<SerializedMessage> filterMessages(Collection<SerializedMessage> collection) {
        long indexFromMillis = IndexUtils.indexFromMillis(this.clock.millis());
        return (List) super.filterMessages(collection).stream().filter(serializedMessage -> {
            return serializedMessage.getIndex().longValue() <= indexFromMillis && this.scheduleIdsByIndex.containsKey(serializedMessage.getIndex());
        }).collect(Collectors.toList());
    }

    @Override // io.fluxcapacitor.javaclient.scheduling.client.SchedulingClient
    public Awaitable schedule(SerializedSchedule... serializedScheduleArr) {
        long j;
        List<SerializedSchedule> list = (List) Arrays.stream(serializedScheduleArr).filter(serializedSchedule -> {
            return (serializedSchedule.isIfAbsent() && this.scheduleIdsByIndex.containsValue(serializedSchedule.getScheduleId())) ? false : true;
        }).collect(Collectors.toList());
        for (SerializedSchedule serializedSchedule2 : list) {
            cancelSchedule(serializedSchedule2.getScheduleId());
            long indexFromMillis = IndexUtils.indexFromMillis(serializedSchedule2.getTimestamp());
            while (true) {
                j = indexFromMillis;
                if (this.scheduleIdsByIndex.putIfAbsent(Long.valueOf(j), serializedSchedule2.getScheduleId()) != null) {
                    indexFromMillis = j + 1;
                }
            }
            serializedSchedule2.getMessage().setIndex(Long.valueOf(j));
        }
        super.send(Guarantee.SENT, (SerializedMessage[]) list.stream().map((v0) -> {
            return v0.getMessage();
        }).toArray(i -> {
            return new SerializedMessage[i];
        }));
        return Awaitable.ready();
    }

    @Override // io.fluxcapacitor.javaclient.scheduling.client.SchedulingClient
    public Awaitable cancelSchedule(String str) {
        this.scheduleIdsByIndex.values().removeIf(str2 -> {
            return str2.equals(str);
        });
        return Awaitable.ready();
    }

    @Override // io.fluxcapacitor.javaclient.scheduling.client.SchedulingClient
    public SerializedSchedule getSchedule(String str) {
        return (SerializedSchedule) this.scheduleIdsByIndex.entrySet().stream().filter(entry -> {
            return str.equals(entry.getValue());
        }).findFirst().map(entry2 -> {
            return new SerializedSchedule(str, IndexUtils.millisFromIndex(((Long) entry2.getKey()).longValue()), getMessage(((Long) entry2.getKey()).longValue()), false);
        }).orElse(null);
    }

    @Override // io.fluxcapacitor.javaclient.tracking.client.InMemoryMessageStore, io.fluxcapacitor.javaclient.publishing.client.GatewayClient
    public Awaitable send(Guarantee guarantee, SerializedMessage... serializedMessageArr) {
        throw new UnsupportedOperationException("Use method #schedule instead");
    }

    public void setClock(@NonNull Clock clock) {
        if (clock == null) {
            throw new NullPointerException("clock is marked non-null but is null");
        }
        synchronized (this) {
            this.clock = clock;
            notifyAll();
        }
    }

    public List<Schedule> removeExpiredSchedules(Serializer serializer) {
        ConcurrentNavigableMap<Long, String> headMap = this.scheduleIdsByIndex.headMap((ConcurrentSkipListMap<Long, String>) Long.valueOf(IndexUtils.indexFromMillis(this.clock.millis())), true);
        List<Schedule> list = (List) headMap.entrySet().stream().map(entry -> {
            SerializedMessage message = getMessage(((Long) entry.getKey()).longValue());
            return new Schedule(serializer.deserializeMessages(Stream.of(message), MessageType.SCHEDULE).findFirst().get().getPayload(), message.getMetadata(), (String) entry.getValue(), IndexUtils.timestampFromIndex(((Long) entry.getKey()).longValue()));
        }).collect(Collectors.toList());
        headMap.clear();
        return list;
    }

    public List<Schedule> getSchedules(Serializer serializer) {
        return (List) this.scheduleIdsByIndex.entrySet().stream().map(entry -> {
            SerializedMessage message = getMessage(((Long) entry.getKey()).longValue());
            return new Schedule(serializer.deserializeMessages(Stream.of(message), MessageType.SCHEDULE).findFirst().get().getPayload(), message.getMetadata(), (String) entry.getValue(), IndexUtils.timestampFromIndex(((Long) entry.getKey()).longValue()));
        }).collect(Collectors.toList());
    }
}
