/*
 * Decompiled with CFR 0.152.
 */
package io.fluxcapacitor.javaclient.scheduling.client;

import io.fluxcapacitor.common.Awaitable;
import io.fluxcapacitor.common.IndexUtils;
import io.fluxcapacitor.common.MessageType;
import io.fluxcapacitor.common.api.SerializedMessage;
import io.fluxcapacitor.common.api.scheduling.ScheduledMessage;
import io.fluxcapacitor.javaclient.common.serialization.Serializer;
import io.fluxcapacitor.javaclient.scheduling.Schedule;
import io.fluxcapacitor.javaclient.scheduling.client.SchedulingClient;
import io.fluxcapacitor.javaclient.tracking.client.InMemoryMessageStore;
import java.time.Clock;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.NavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InMemorySchedulingClient
extends InMemoryMessageStore
implements SchedulingClient {
    private static final Logger log = LoggerFactory.getLogger(InMemorySchedulingClient.class);
    private final ConcurrentSkipListMap<Long, String> scheduleIdsByIndex = new ConcurrentSkipListMap();
    private volatile Clock clock = Clock.systemUTC();

    @Override
    protected List<SerializedMessage> filterMessages(Collection<SerializedMessage> messages) {
        long maximumIndex = IndexUtils.indexFromMillis((long)this.clock.millis());
        return super.filterMessages(messages).stream().filter(m -> m.getIndex() <= maximumIndex && this.scheduleIdsByIndex.containsKey(m.getIndex())).collect(Collectors.toList());
    }

    @Override
    public Awaitable schedule(ScheduledMessage ... schedules) {
        List filtered = Arrays.stream(schedules).filter(s -> !s.isIfAbsent() || !this.scheduleIdsByIndex.containsValue(s.getScheduleId())).collect(Collectors.toList());
        for (ScheduledMessage schedule : filtered) {
            this.cancelSchedule(schedule.getScheduleId());
            long index = IndexUtils.indexFromMillis((long)schedule.getTimestamp());
            while (this.scheduleIdsByIndex.putIfAbsent(index, schedule.getScheduleId()) != null) {
                ++index;
            }
            schedule.getMessage().setIndex(Long.valueOf(index));
        }
        super.send((SerializedMessage[])filtered.stream().map(ScheduledMessage::getMessage).toArray(SerializedMessage[]::new));
        return Awaitable.ready();
    }

    @Override
    public Awaitable cancelSchedule(String scheduleId) {
        this.scheduleIdsByIndex.values().removeIf(s -> s.equals(scheduleId));
        return Awaitable.ready();
    }

    @Override
    public Awaitable send(SerializedMessage ... messages) {
        throw new UnsupportedOperationException("Use method #schedule instead");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setClock(@NonNull Clock clock) {
        if (clock == null) {
            throw new NullPointerException("clock is marked non-null but is null");
        }
        InMemorySchedulingClient inMemorySchedulingClient = this;
        synchronized (inMemorySchedulingClient) {
            this.clock = clock;
            this.notifyAll();
        }
    }

    public List<Schedule> removeExpiredSchedules(Serializer serializer) {
        NavigableMap expiredEntries = this.scheduleIdsByIndex.headMap((Object)IndexUtils.indexFromMillis((long)this.clock.millis()), true);
        List<Schedule> result = expiredEntries.entrySet().stream().map(e -> {
            SerializedMessage m = this.getMessage((Long)e.getKey());
            return new Schedule(serializer.deserializeMessages(Stream.of(m), true, MessageType.SCHEDULE).findFirst().get().getPayload(), m.getMetadata(), (String)e.getValue(), IndexUtils.timestampFromIndex((long)((Long)e.getKey())));
        }).collect(Collectors.toList());
        expiredEntries.clear();
        return result;
    }
}

