package io.atomix.protocols.raft.impl;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.primitives.Longs;
import io.atomix.cluster.MemberId;
import io.atomix.primitive.PrimitiveId;
import io.atomix.primitive.PrimitiveType;
import io.atomix.primitive.service.ServiceConfig;
import io.atomix.primitive.session.SessionId;
import io.atomix.primitive.session.SessionMetadata;
import io.atomix.protocols.raft.RaftException;
import io.atomix.protocols.raft.RaftServer;
import io.atomix.protocols.raft.service.RaftServiceContext;
import io.atomix.protocols.raft.session.RaftSession;
import io.atomix.protocols.raft.storage.log.RaftLog;
import io.atomix.protocols.raft.storage.log.RaftLogReader;
import io.atomix.protocols.raft.storage.log.entry.CloseSessionEntry;
import io.atomix.protocols.raft.storage.log.entry.CommandEntry;
import io.atomix.protocols.raft.storage.log.entry.ConfigurationEntry;
import io.atomix.protocols.raft.storage.log.entry.InitializeEntry;
import io.atomix.protocols.raft.storage.log.entry.KeepAliveEntry;
import io.atomix.protocols.raft.storage.log.entry.MetadataEntry;
import io.atomix.protocols.raft.storage.log.entry.OpenSessionEntry;
import io.atomix.protocols.raft.storage.log.entry.QueryEntry;
import io.atomix.protocols.raft.storage.log.entry.RaftLogEntry;
import io.atomix.protocols.raft.storage.snapshot.Snapshot;
import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
import io.atomix.storage.StorageLevel;
import io.atomix.storage.journal.Indexed;
import io.atomix.utils.concurrent.ComposableFuture;
import io.atomix.utils.concurrent.Futures;
import io.atomix.utils.concurrent.OrderedFuture;
import io.atomix.utils.concurrent.ThreadContext;
import io.atomix.utils.concurrent.ThreadContextFactory;
import io.atomix.utils.config.ConfigurationException;
import io.atomix.utils.logging.ContextualLoggerFactory;
import io.atomix.utils.logging.LoggerContext;
import io.atomix.utils.serializer.Serializer;
import io.atomix.utils.time.WallClockTimestamp;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.slf4j.Logger;

/* loaded from: input_file:io/atomix/protocols/raft/impl/RaftServiceManager.class */
public class RaftServiceManager implements AutoCloseable {
    private static final Duration SNAPSHOT_INTERVAL = Duration.ofSeconds(10);
    private static final Duration SNAPSHOT_COMPLETION_DELAY = Duration.ofSeconds(10);
    private static final Duration COMPACT_DELAY = Duration.ofSeconds(10);
    private static final int SEGMENT_BUFFER_FACTOR = 5;
    private final Logger logger;
    private final RaftContext raft;
    private final ThreadContext stateContext;
    private final ThreadContext compactionContext;
    private final ThreadContextFactory threadContextFactory;
    private final RaftLog log;
    private final RaftLogReader reader;
    private final Map<Long, CompletableFuture> futures = Maps.newHashMap();
    private volatile CompletableFuture<Void> compactFuture;
    private long lastEnqueued;
    private long lastCompacted;

    public RaftServiceManager(RaftContext raftContext, ThreadContext threadContext, ThreadContext threadContext2, ThreadContextFactory threadContextFactory) {
        this.raft = (RaftContext) Preconditions.checkNotNull(raftContext, "state cannot be null");
        this.log = raftContext.getLog();
        this.reader = this.log.openReader(1L, RaftLogReader.Mode.COMMITS);
        this.stateContext = threadContext;
        this.compactionContext = threadContext2;
        this.threadContextFactory = threadContextFactory;
        this.logger = ContextualLoggerFactory.getLogger(getClass(), LoggerContext.builder(RaftServer.class).addValue(raftContext.getName()).build());
        this.lastEnqueued = this.reader.getFirstIndex() - 1;
        scheduleSnapshots();
    }

    public ThreadContext executor() {
        return this.stateContext;
    }

    private boolean isRunningOutOfDiskSpace() {
        return this.raft.getStorage().statistics().getUsableSpace() < ((long) (this.raft.getStorage().maxLogSegmentSize() * SEGMENT_BUFFER_FACTOR)) || ((double) this.raft.getStorage().statistics().getUsableSpace()) / ((double) this.raft.getStorage().statistics().getTotalSpace()) < this.raft.getStorage().freeDiskBuffer();
    }

    private void scheduleSnapshots() {
        this.raft.getThreadContext().schedule(SNAPSHOT_INTERVAL, () -> {
            takeSnapshots(true, false);
        });
    }

    public CompletableFuture<Void> compact() {
        return takeSnapshots(false, true);
    }

    private CompletableFuture<Void> takeSnapshots(boolean z, boolean z2) {
        if (this.compactFuture != null) {
            if (z) {
                this.compactFuture.whenComplete((r3, th) -> {
                    scheduleSnapshots();
                });
            }
            return this.compactFuture;
        }
        long lastApplied = this.raft.getLastApplied();
        if (!this.raft.getLog().isCompactable(lastApplied) || this.raft.getLog().getCompactableIndex(lastApplied) <= this.lastCompacted) {
            if (z) {
                scheduleSnapshots();
            }
            return CompletableFuture.completedFuture(null);
        }
        boolean isRunningOutOfDiskSpace = isRunningOutOfDiskSpace();
        if (!z2 && this.raft.getStorage().storageLevel() != StorageLevel.MEMORY && this.raft.getStorage().dynamicCompaction() && !isRunningOutOfDiskSpace && this.raft.getLoadMonitor().isUnderHighLoad()) {
            this.logger.debug("Skipping compaction due to high load");
            if (z) {
                scheduleSnapshots();
            }
            return CompletableFuture.completedFuture(null);
        }
        this.logger.debug("Snapshotting services");
        this.lastCompacted = lastApplied;
        this.compactFuture = new OrderedFuture();
        takeSnapshots(lastApplied).whenCompleteAsync((snapshot, th2) -> {
            if (th2 == null) {
                scheduleCompletion(snapshot.persist());
            }
        }, (Executor) this.compactionContext);
        if (z) {
            this.compactFuture.whenComplete((r32, th3) -> {
                scheduleSnapshots();
            });
        }
        return this.compactFuture;
    }

    private CompletableFuture<Snapshot> takeSnapshots(long j) {
        ComposableFuture composableFuture = new ComposableFuture();
        this.stateContext.execute(() -> {
            try {
                composableFuture.complete(snapshot(j));
            } catch (Exception e) {
                composableFuture.completeExceptionally(e);
            }
        });
        return composableFuture;
    }

    private void scheduleCompletion(Snapshot snapshot) {
        this.stateContext.schedule(SNAPSHOT_COMPLETION_DELAY, () -> {
            if (!completeSnapshot(snapshot.index())) {
                scheduleCompletion(snapshot);
                return;
            }
            this.logger.debug("Completing snapshot {}", Long.valueOf(snapshot.index()));
            snapshot.complete();
            if (!this.raft.getLoadMonitor().isUnderHighLoad() || isRunningOutOfDiskSpace()) {
                compactLogs(snapshot.index());
            } else {
                scheduleCompaction(snapshot.index());
            }
        });
    }

    private void scheduleCompaction(long j) {
        this.logger.trace("Scheduling compaction in {}", COMPACT_DELAY);
        this.stateContext.schedule(COMPACT_DELAY, () -> {
            compactLogs(j);
        });
    }

    private void compactLogs(long j) {
        this.raft.getThreadContext().execute(() -> {
            this.logger.debug("Compacting logs up to index {}", Long.valueOf(j));
            try {
                try {
                    this.raft.getLog().compact(j);
                    this.compactFuture.complete(null);
                    this.compactFuture = null;
                    takeSnapshots(false, false);
                } catch (Exception e) {
                    this.logger.error("An exception occurred during log compaction: {}", e);
                    this.compactFuture.complete(null);
                    this.compactFuture = null;
                    takeSnapshots(false, false);
                }
            } catch (Throwable th) {
                this.compactFuture.complete(null);
                this.compactFuture = null;
                takeSnapshots(false, false);
                throw th;
            }
        });
    }

    public void applyAll(long j) {
        enqueueBatch(j);
    }

    public <T> CompletableFuture<T> apply(long j) {
        CompletableFuture<T> computeIfAbsent = this.futures.computeIfAbsent(Long.valueOf(j), l -> {
            return new CompletableFuture();
        });
        enqueueBatch(j);
        return computeIfAbsent;
    }

    private void enqueueBatch(long j) {
        while (this.lastEnqueued < j) {
            long j2 = this.lastEnqueued + 1;
            this.lastEnqueued = j2;
            enqueueIndex(j2);
        }
    }

    private void enqueueIndex(long j) {
        this.raft.getThreadContext().execute(() -> {
            applyIndex(j);
        });
    }

    private void applyIndex(long j) {
        if (!this.reader.hasNext() || this.reader.getNextIndex() != j) {
            CompletableFuture remove = this.futures.remove(Long.valueOf(j));
            if (remove != null) {
                this.logger.error("Cannot apply index " + j);
                remove.completeExceptionally(new IndexOutOfBoundsException("Cannot apply index " + j));
                return;
            }
            return;
        }
        Indexed next = this.reader.next();
        try {
            try {
                if (next.index() != j) {
                    throw new IllegalStateException("inconsistent index applying entry " + j + ": " + next);
                }
                CompletableFuture remove2 = this.futures.remove(Long.valueOf(j));
                apply((Indexed<? extends RaftLogEntry>) next).whenComplete((obj, th) -> {
                    if (remove2 != null) {
                        if (th == null) {
                            remove2.complete(obj);
                        } else {
                            remove2.completeExceptionally(th);
                        }
                    }
                });
                this.raft.setLastApplied(j);
            } catch (Exception e) {
                this.logger.error("Failed to apply {}: {}", next, e);
                this.raft.setLastApplied(j);
            }
        } catch (Throwable th2) {
            this.raft.setLastApplied(j);
            throw th2;
        }
    }

    public <T> CompletableFuture<T> apply(Indexed<? extends RaftLogEntry> indexed) {
        CompletableFuture<T> completableFuture = new CompletableFuture<>();
        this.stateContext.execute(() -> {
            this.logger.trace("Applying {}", indexed);
            try {
                if (indexed.type() == QueryEntry.class) {
                    applyQuery(indexed.cast()).whenComplete((operationResult, th) -> {
                        if (th != null) {
                            completableFuture.completeExceptionally(th);
                        } else {
                            completableFuture.complete(operationResult);
                        }
                    });
                } else {
                    install(indexed.index());
                    if (indexed.type() == CommandEntry.class) {
                        completableFuture.complete(applyCommand(indexed.cast()));
                    } else if (indexed.type() == OpenSessionEntry.class) {
                        completableFuture.complete(Long.valueOf(applyOpenSession(indexed.cast())));
                    } else if (indexed.type() == KeepAliveEntry.class) {
                        completableFuture.complete(applyKeepAlive(indexed.cast()));
                    } else if (indexed.type() == CloseSessionEntry.class) {
                        applyCloseSession(indexed.cast());
                        completableFuture.complete(null);
                    } else if (indexed.type() == MetadataEntry.class) {
                        completableFuture.complete(applyMetadata(indexed.cast()));
                    } else if (indexed.type() == InitializeEntry.class) {
                        completableFuture.complete(applyInitialize(indexed.cast()));
                    } else if (indexed.type() == ConfigurationEntry.class) {
                        completableFuture.complete(applyConfiguration(indexed.cast()));
                    } else {
                        completableFuture.completeExceptionally(new RaftException.ProtocolException("Unknown entry type", new Object[0]));
                    }
                }
            } catch (Exception e) {
                completableFuture.completeExceptionally(e);
            }
        });
        return completableFuture;
    }

    private Snapshot snapshot(long j) {
        Snapshot newTemporarySnapshot = this.raft.getSnapshotStore().newTemporarySnapshot(j, new WallClockTimestamp());
        try {
            SnapshotWriter openWriter = newTemporarySnapshot.openWriter();
            Throwable th = null;
            try {
                try {
                    Iterator<RaftServiceContext> it = this.raft.getServices().iterator();
                    while (it.hasNext()) {
                        RaftServiceContext next = it.next();
                        openWriter.buffer().mark();
                        SnapshotWriter snapshotWriter = new SnapshotWriter(openWriter.buffer().writeInt(0).slice(), openWriter.snapshot());
                        snapshotService(snapshotWriter, next);
                        int position = snapshotWriter.buffer().position();
                        openWriter.buffer().reset().writeInt(position).skip(position);
                    }
                    if (openWriter != null) {
                        if (0 != 0) {
                            try {
                                openWriter.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            openWriter.close();
                        }
                    }
                    return newTemporarySnapshot;
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            newTemporarySnapshot.close();
            throw e;
        }
    }

    private void snapshotService(SnapshotWriter snapshotWriter, RaftServiceContext raftServiceContext) {
        snapshotWriter.m90writeLong(((Long) raftServiceContext.serviceId().id()).longValue());
        snapshotWriter.m86writeString(raftServiceContext.serviceType().name());
        snapshotWriter.m86writeString(raftServiceContext.serviceName());
        byte[] encode = Serializer.using(raftServiceContext.serviceType().namespace()).encode(raftServiceContext.serviceConfig());
        snapshotWriter.m92writeInt(encode.length).writeBytes(encode);
        raftServiceContext.takeSnapshot(snapshotWriter);
    }

    private void install(long j) {
        Snapshot snapshot = this.raft.getSnapshotStore().getSnapshot(j - 1);
        if (snapshot != null) {
            this.logger.debug("Installing snapshot {}", snapshot);
            SnapshotReader openReader = snapshot.openReader();
            Throwable th = null;
            while (openReader.hasRemaining()) {
                try {
                    try {
                        int readInt = openReader.readInt();
                        if (readInt > 0) {
                            installService(new SnapshotReader(openReader.buffer().slice(readInt), openReader.snapshot()));
                            openReader.m82skip(readInt);
                        }
                    } catch (Throwable th2) {
                        th = th2;
                        throw th2;
                    }
                } catch (Throwable th3) {
                    if (openReader != null) {
                        if (th != null) {
                            try {
                                openReader.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            openReader.close();
                        }
                    }
                    throw th3;
                }
            }
            if (openReader != null) {
                if (0 == 0) {
                    openReader.close();
                    return;
                }
                try {
                    openReader.close();
                } catch (Throwable th5) {
                    th.addSuppressed(th5);
                }
            }
        }
    }

    private void installService(SnapshotReader snapshotReader) {
        PrimitiveId from = PrimitiveId.from(snapshotReader.readLong());
        try {
            PrimitiveType primitiveType = this.raft.getPrimitiveTypes().getPrimitiveType(snapshotReader.readString());
            String readString = snapshotReader.readString();
            byte[] readBytes = snapshotReader.readBytes(snapshotReader.readInt());
            this.logger.debug("Installing service {} {}", from, readString);
            RaftServiceContext initializeService = initializeService(from, primitiveType, readString, readBytes);
            if (initializeService != null) {
                initializeService.installSnapshot(snapshotReader);
            }
        } catch (ConfigurationException e) {
            this.logger.error(e.getMessage(), e);
        }
    }

    private boolean completeSnapshot(long j) {
        long j2 = j;
        Iterator<RaftSession> it = this.raft.getSessions().getSessions().iterator();
        while (it.hasNext()) {
            j2 = Math.min(j2, it.next().getLastCompleted());
        }
        return j2 >= j;
    }

    private CompletableFuture<Void> applyInitialize(Indexed<InitializeEntry> indexed) {
        Iterator<RaftServiceContext> it = this.raft.getServices().iterator();
        while (it.hasNext()) {
            it.next().keepAliveSessions(indexed.index(), ((InitializeEntry) indexed.entry()).timestamp());
        }
        return CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<Void> applyConfiguration(Indexed<ConfigurationEntry> indexed) {
        Iterator<RaftServiceContext> it = this.raft.getServices().iterator();
        while (it.hasNext()) {
            it.next().keepAliveSessions(indexed.index(), ((ConfigurationEntry) indexed.entry()).timestamp());
        }
        return CompletableFuture.completedFuture(null);
    }

    private long[] applyKeepAlive(Indexed<KeepAliveEntry> indexed) {
        long[] sessionIds = ((KeepAliveEntry) indexed.entry()).sessionIds();
        long[] commandSequenceNumbers = ((KeepAliveEntry) indexed.entry()).commandSequenceNumbers();
        long[] eventIndexes = ((KeepAliveEntry) indexed.entry()).eventIndexes();
        ArrayList arrayList = new ArrayList(sessionIds.length);
        HashSet hashSet = new HashSet();
        for (int i = 0; i < sessionIds.length; i++) {
            long j = sessionIds[i];
            long j2 = commandSequenceNumbers[i];
            long j3 = eventIndexes[i];
            RaftSession session = this.raft.getSessions().getSession(j);
            if (session != null && session.getService().keepAlive(indexed.index(), ((KeepAliveEntry) indexed.entry()).timestamp(), session, j2, j3)) {
                arrayList.add(Long.valueOf(j));
                hashSet.add(session.getService());
            }
        }
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            ((RaftServiceContext) it.next()).completeKeepAlive(indexed.index(), ((KeepAliveEntry) indexed.entry()).timestamp());
        }
        return Longs.toArray(arrayList);
    }

    private RaftServiceContext getOrInitializeService(PrimitiveId primitiveId, PrimitiveType primitiveType, String str, byte[] bArr) {
        RaftServiceContext service = this.raft.getServices().getService(str);
        if (service == null) {
            service = initializeService(primitiveId, primitiveType, str, bArr);
        }
        return service;
    }

    private RaftServiceContext initializeService(PrimitiveId primitiveId, PrimitiveType primitiveType, String str, byte[] bArr) {
        RaftServiceContext service = this.raft.getServices().getService(str);
        ServiceConfig serviceConfig = (ServiceConfig) Serializer.using(primitiveType.namespace()).decode(bArr);
        RaftServiceContext raftServiceContext = new RaftServiceContext(primitiveId, str, primitiveType, serviceConfig, primitiveType.newService(serviceConfig), this.raft, this.threadContextFactory);
        this.raft.getServices().registerService(raftServiceContext);
        if (service != null) {
            this.raft.getSessions().removeSessions(service.serviceId());
        }
        return raftServiceContext;
    }

    private long applyOpenSession(Indexed<OpenSessionEntry> indexed) {
        PrimitiveType primitiveType = this.raft.getPrimitiveTypes().getPrimitiveType(((OpenSessionEntry) indexed.entry()).serviceType());
        RaftServiceContext orInitializeService = getOrInitializeService(PrimitiveId.from(indexed.index()), primitiveType, ((OpenSessionEntry) indexed.entry()).serviceName(), ((OpenSessionEntry) indexed.entry()).serviceConfig());
        if (orInitializeService == null) {
            throw new RaftException.UnknownService("Unknown service type " + ((OpenSessionEntry) indexed.entry()).serviceType(), new Object[0]);
        }
        return orInitializeService.openSession(indexed.index(), ((OpenSessionEntry) indexed.entry()).timestamp(), this.raft.getSessions().addSession(new RaftSession(SessionId.from(indexed.index()), MemberId.from(((OpenSessionEntry) indexed.entry()).memberId()), ((OpenSessionEntry) indexed.entry()).serviceName(), primitiveType, ((OpenSessionEntry) indexed.entry()).readConsistency(), ((OpenSessionEntry) indexed.entry()).minTimeout(), ((OpenSessionEntry) indexed.entry()).maxTimeout(), ((OpenSessionEntry) indexed.entry()).timestamp(), orInitializeService.serializer(), orInitializeService, this.raft, this.threadContextFactory)));
    }

    private void applyCloseSession(Indexed<CloseSessionEntry> indexed) {
        RaftSession session = this.raft.getSessions().getSession(((CloseSessionEntry) indexed.entry()).session());
        if (session == null) {
            throw new RaftException.UnknownSession("Unknown session: " + ((CloseSessionEntry) indexed.entry()).session(), new Object[0]);
        }
        session.getService().closeSession(indexed.index(), ((CloseSessionEntry) indexed.entry()).timestamp(), session, ((CloseSessionEntry) indexed.entry()).expired());
    }

    private MetadataResult applyMetadata(Indexed<MetadataEntry> indexed) {
        if (((MetadataEntry) indexed.entry()).session() <= 0) {
            HashSet hashSet = new HashSet();
            for (RaftSession raftSession : this.raft.getSessions().getSessions()) {
                hashSet.add(new SessionMetadata(((Long) raftSession.sessionId().id()).longValue(), raftSession.primitiveName(), raftSession.primitiveType().name()));
            }
            return new MetadataResult(hashSet);
        }
        RaftSession session = this.raft.getSessions().getSession(((MetadataEntry) indexed.entry()).session());
        if (session == null) {
            this.logger.warn("Unknown session: " + ((MetadataEntry) indexed.entry()).session());
            throw new RaftException.UnknownSession("Unknown session: " + ((MetadataEntry) indexed.entry()).session(), new Object[0]);
        }
        HashSet hashSet2 = new HashSet();
        for (RaftSession raftSession2 : this.raft.getSessions().getSessions()) {
            if (raftSession2.primitiveName().equals(session.primitiveName())) {
                hashSet2.add(new SessionMetadata(((Long) raftSession2.sessionId().id()).longValue(), raftSession2.primitiveName(), raftSession2.primitiveType().name()));
            }
        }
        return new MetadataResult(hashSet2);
    }

    private OperationResult applyCommand(Indexed<CommandEntry> indexed) {
        RaftSession session = this.raft.getSessions().getSession(((CommandEntry) indexed.entry()).session());
        if (session == null) {
            this.logger.debug("Unknown session: " + ((CommandEntry) indexed.entry()).session());
            throw new RaftException.UnknownSession("unknown session: " + ((CommandEntry) indexed.entry()).session(), new Object[0]);
        }
        this.raft.getLoadMonitor().recordEvent();
        return session.getService().executeCommand(indexed.index(), ((CommandEntry) indexed.entry()).sequenceNumber(), ((CommandEntry) indexed.entry()).timestamp(), session, ((CommandEntry) indexed.entry()).operation());
    }

    private CompletableFuture<OperationResult> applyQuery(Indexed<QueryEntry> indexed) {
        RaftSession session = this.raft.getSessions().getSession(((QueryEntry) indexed.entry()).session());
        if (session != null) {
            return session.getService().executeQuery(indexed.index(), ((QueryEntry) indexed.entry()).sequenceNumber(), ((QueryEntry) indexed.entry()).timestamp(), session, ((QueryEntry) indexed.entry()).operation());
        }
        this.logger.warn("Unknown session: " + ((QueryEntry) indexed.entry()).session());
        return Futures.exceptionalFuture(new RaftException.UnknownSession("unknown session " + ((QueryEntry) indexed.entry()).session(), new Object[0]));
    }

    @Override // java.lang.AutoCloseable
    public void close() {
    }
}
