package io.atomix.protocols.raft.impl;

import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs;
import io.atomix.cluster.NodeId;
import io.atomix.primitive.PrimitiveId;
import io.atomix.primitive.PrimitiveType;
import io.atomix.primitive.session.SessionId;
import io.atomix.primitive.session.SessionMetadata;
import io.atomix.protocols.raft.RaftException;
import io.atomix.protocols.raft.RaftServer;
import io.atomix.protocols.raft.ReadConsistency;
import io.atomix.protocols.raft.service.RaftServiceContext;
import io.atomix.protocols.raft.session.RaftSession;
import io.atomix.protocols.raft.storage.log.RaftLog;
import io.atomix.protocols.raft.storage.log.RaftLogReader;
import io.atomix.protocols.raft.storage.log.entry.CloseSessionEntry;
import io.atomix.protocols.raft.storage.log.entry.CommandEntry;
import io.atomix.protocols.raft.storage.log.entry.ConfigurationEntry;
import io.atomix.protocols.raft.storage.log.entry.InitializeEntry;
import io.atomix.protocols.raft.storage.log.entry.KeepAliveEntry;
import io.atomix.protocols.raft.storage.log.entry.MetadataEntry;
import io.atomix.protocols.raft.storage.log.entry.OpenSessionEntry;
import io.atomix.protocols.raft.storage.log.entry.QueryEntry;
import io.atomix.protocols.raft.storage.log.entry.RaftLogEntry;
import io.atomix.protocols.raft.storage.snapshot.Snapshot;
import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
import io.atomix.storage.journal.Indexed;
import io.atomix.utils.concurrent.Futures;
import io.atomix.utils.concurrent.ThreadContextFactory;
import io.atomix.utils.logging.ContextualLoggerFactory;
import io.atomix.utils.logging.LoggerContext;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;

/* loaded from: input_file:io/atomix/protocols/raft/impl/RaftServiceManager.class */
public class RaftServiceManager implements AutoCloseable {
    private final Logger logger;
    private final RaftContext raft;
    private final ThreadContextFactory threadContextFactory;
    private final RaftLog log;
    private final RaftLogReader reader;

    public RaftServiceManager(RaftContext raftContext, ThreadContextFactory threadContextFactory) {
        this.raft = (RaftContext) Preconditions.checkNotNull(raftContext, "state cannot be null");
        this.log = raftContext.getLog();
        this.reader = this.log.openReader(1L, RaftLogReader.Mode.COMMITS);
        this.threadContextFactory = threadContextFactory;
        this.logger = ContextualLoggerFactory.getLogger(getClass(), LoggerContext.builder(RaftServer.class).addValue(raftContext.getName()).build2());
    }

    public void applyAll(long j) {
        if (j > this.raft.getLastApplied()) {
            this.raft.getThreadContext().execute(() -> {
                apply(j);
            });
        }
    }

    public <T> CompletableFuture<T> apply(long j) {
        while (this.reader.hasNext()) {
            long nextIndex = this.reader.getNextIndex();
            long lastApplied = this.raft.getLastApplied();
            if (nextIndex > lastApplied + 1 && nextIndex != this.reader.getFirstIndex()) {
                this.logger.error("Cannot apply non-sequential index {} unless it's the first entry in the log: {}", Long.valueOf(nextIndex), Long.valueOf(this.reader.getFirstIndex()));
                return Futures.exceptionalFuture(new IndexOutOfBoundsException("Cannot apply non-sequential index unless it's the first entry in the log"));
            }
            if (nextIndex < lastApplied) {
                this.logger.error("Cannot apply duplicate entry at index {}", Long.valueOf(nextIndex));
                return Futures.exceptionalFuture(new IndexOutOfBoundsException("Cannot apply duplicate entry at index " + nextIndex));
            }
            if (nextIndex < j) {
                Indexed<RaftLogEntry> next = this.reader.next();
                try {
                    try {
                        apply(next);
                        restoreIndex(next.index());
                        this.raft.setLastApplied(nextIndex);
                    } catch (Throwable th) {
                        this.raft.setLastApplied(nextIndex);
                        throw th;
                    }
                } catch (Exception e) {
                    this.logger.error("Failed to apply {}: {}", next, e);
                    this.raft.setLastApplied(nextIndex);
                }
            } else {
                if (nextIndex != j) {
                    this.raft.setLastApplied(nextIndex);
                    return Futures.completedFuture(null);
                }
                Indexed<RaftLogEntry> next2 = this.reader.next();
                try {
                    if (next2.index() != j) {
                        throw new IllegalStateException("inconsistent index applying entry " + j + ": " + next2);
                    }
                    CompletableFuture<T> apply = apply(next2);
                    restoreIndex(next2.index());
                    this.raft.setLastApplied(nextIndex);
                    return apply;
                } catch (Exception e2) {
                    try {
                        this.logger.error("Failed to apply {}: {}", next2, e2);
                        this.raft.setLastApplied(nextIndex);
                    } catch (Throwable th2) {
                        this.raft.setLastApplied(nextIndex);
                        throw th2;
                    }
                }
            }
        }
        this.logger.error("Cannot commit index " + j);
        return Futures.exceptionalFuture(new IndexOutOfBoundsException("Cannot commit index " + j));
    }

    public <T> CompletableFuture<T> apply(Indexed<? extends RaftLogEntry> indexed) {
        this.logger.trace("Applying {}", indexed);
        return indexed.type() == QueryEntry.class ? (CompletableFuture<T>) applyQuery(indexed.cast()) : indexed.type() == CommandEntry.class ? (CompletableFuture<T>) applyCommand(indexed.cast()) : indexed.type() == OpenSessionEntry.class ? (CompletableFuture<T>) applyOpenSession(indexed.cast()) : indexed.type() == KeepAliveEntry.class ? (CompletableFuture<T>) applyKeepAlive(indexed.cast()) : indexed.type() == CloseSessionEntry.class ? (CompletableFuture<T>) applyCloseSession(indexed.cast()) : indexed.type() == MetadataEntry.class ? (CompletableFuture<T>) applyMetadata(indexed.cast()) : indexed.type() == InitializeEntry.class ? (CompletableFuture<T>) applyInitialize(indexed.cast()) : indexed.type() == ConfigurationEntry.class ? (CompletableFuture<T>) applyConfiguration(indexed.cast()) : Futures.exceptionalFuture(new RaftException.ProtocolException("Unknown entry type", new Object[0]));
    }

    private void restoreIndex(long j) {
        Collection<Snapshot> snapshotsByIndex = this.raft.getSnapshotStore().getSnapshotsByIndex(j);
        if (snapshotsByIndex != null) {
            Iterator<Snapshot> it = snapshotsByIndex.iterator();
            while (it.hasNext()) {
                SnapshotReader openReader = it.next().openReader();
                Throwable th = null;
                try {
                    try {
                        restoreService(openReader);
                        if (openReader != null) {
                            if (0 != 0) {
                                try {
                                    openReader.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                openReader.close();
                            }
                        }
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                    }
                } catch (Throwable th4) {
                    if (openReader != null) {
                        if (th != null) {
                            try {
                                openReader.close();
                            } catch (Throwable th5) {
                                th.addSuppressed(th5);
                            }
                        } else {
                            openReader.close();
                        }
                    }
                    throw th4;
                }
            }
        }
    }

    private void restoreService(SnapshotReader snapshotReader) {
        PrimitiveId from = PrimitiveId.from(snapshotReader.readLong());
        PrimitiveType primitiveType = this.raft.getPrimitiveTypes().get(snapshotReader.readString());
        String readString = snapshotReader.readString();
        this.logger.debug("Restoring service {} {}", from, readString);
        RaftServiceContext initializeService = initializeService(from, primitiveType, readString);
        if (initializeService == null) {
            return;
        }
        restoreSessions(snapshotReader, initializeService);
    }

    private void restoreSessions(SnapshotReader snapshotReader, RaftServiceContext raftServiceContext) {
        int readInt = snapshotReader.readInt();
        for (int i = 0; i < readInt; i++) {
            restoreSession(snapshotReader, raftServiceContext);
        }
    }

    private void restoreSession(SnapshotReader snapshotReader, RaftServiceContext raftServiceContext) {
        SessionId from = SessionId.from(snapshotReader.readLong());
        this.logger.trace("Restoring session {} for {}", from, raftServiceContext.serviceName());
        RaftSession raftSession = new RaftSession(from, NodeId.from(snapshotReader.readString()), raftServiceContext.serviceName(), raftServiceContext.serviceType(), ReadConsistency.valueOf(snapshotReader.readString()), snapshotReader.readLong(), snapshotReader.readLong(), snapshotReader.readLong(), raftServiceContext, this.raft, this.threadContextFactory);
        raftSession.setRequestSequence(snapshotReader.readLong());
        raftSession.setCommandSequence(snapshotReader.readLong());
        raftSession.setEventIndex(snapshotReader.readLong());
        raftSession.setLastCompleted(snapshotReader.readLong());
        raftSession.setLastApplied(snapshotReader.snapshot().index());
        this.raft.getSessions().addSession(raftSession);
    }

    private CompletableFuture<Void> applyInitialize(Indexed<InitializeEntry> indexed) {
        Iterator<RaftServiceContext> it = this.raft.getServices().iterator();
        while (it.hasNext()) {
            it.next().keepAliveSessions(indexed.index(), indexed.entry().timestamp());
        }
        return CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<Void> applyConfiguration(Indexed<ConfigurationEntry> indexed) {
        Iterator<RaftServiceContext> it = this.raft.getServices().iterator();
        while (it.hasNext()) {
            it.next().keepAliveSessions(indexed.index(), indexed.entry().timestamp());
        }
        return CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<long[]> applyKeepAlive(Indexed<KeepAliveEntry> indexed) {
        long[] sessionIds = indexed.entry().sessionIds();
        long[] commandSequenceNumbers = indexed.entry().commandSequenceNumbers();
        long[] eventIndexes = indexed.entry().eventIndexes();
        ArrayList arrayList = new ArrayList(sessionIds.length);
        ArrayList arrayList2 = new ArrayList(sessionIds.length);
        for (int i = 0; i < sessionIds.length; i++) {
            long j = sessionIds[i];
            long j2 = commandSequenceNumbers[i];
            long j3 = eventIndexes[i];
            RaftSession session = this.raft.getSessions().getSession(j);
            if (session != null) {
                arrayList2.add(session.getService().keepAlive(indexed.index(), indexed.entry().timestamp(), session, j2, j3).thenApply(bool -> {
                    if (!bool.booleanValue()) {
                        return null;
                    }
                    synchronized (arrayList) {
                        arrayList.add(Long.valueOf(j));
                    }
                    return null;
                }));
            }
        }
        Iterator<RaftServiceContext> it = this.raft.getServices().iterator();
        while (it.hasNext()) {
            it.next().completeKeepAlive(indexed.index(), indexed.entry().timestamp());
        }
        return CompletableFuture.allOf((CompletableFuture[]) arrayList2.toArray(new CompletableFuture[arrayList2.size()])).thenApply(r4 -> {
            long[] array;
            synchronized (arrayList) {
                array = Longs.toArray(arrayList);
            }
            return array;
        });
    }

    private RaftServiceContext getOrInitializeService(PrimitiveId primitiveId, PrimitiveType primitiveType, String str) {
        RaftServiceContext service = this.raft.getServices().getService(str);
        if (service == null) {
            service = initializeService(primitiveId, primitiveType, str);
        }
        return service;
    }

    private RaftServiceContext initializeService(PrimitiveId primitiveId, PrimitiveType primitiveType, String str) {
        RaftServiceContext service = this.raft.getServices().getService(str);
        RaftServiceContext raftServiceContext = new RaftServiceContext(primitiveId, str, primitiveType, primitiveType.newService(), this.raft, this.threadContextFactory);
        this.raft.getServices().registerService(raftServiceContext);
        if (service != null) {
            this.raft.getSessions().removeSessions(service.serviceId());
        }
        return raftServiceContext;
    }

    private CompletableFuture<Long> applyOpenSession(Indexed<OpenSessionEntry> indexed) {
        PrimitiveType primitiveType = this.raft.getPrimitiveTypes().get(indexed.entry().serviceType());
        if (primitiveType == null) {
            return Futures.exceptionalFuture(new RaftException.UnknownService("Unknown service type " + indexed.entry().serviceType(), new Object[0]));
        }
        RaftServiceContext orInitializeService = getOrInitializeService(PrimitiveId.from(indexed.index()), this.raft.getPrimitiveTypes().get(indexed.entry().serviceType()), indexed.entry().serviceName());
        if (orInitializeService == null) {
            return Futures.exceptionalFuture(new RaftException.UnknownService("Unknown service type " + indexed.entry().serviceType(), new Object[0]));
        }
        return orInitializeService.openSession(indexed.index(), indexed.entry().timestamp(), this.raft.getSessions().addSession(new RaftSession(SessionId.from(indexed.index()), NodeId.from(indexed.entry().memberId()), indexed.entry().serviceName(), primitiveType, indexed.entry().readConsistency(), indexed.entry().minTimeout(), indexed.entry().maxTimeout(), indexed.entry().timestamp(), orInitializeService, this.raft, this.threadContextFactory)));
    }

    private CompletableFuture<Void> applyCloseSession(Indexed<CloseSessionEntry> indexed) {
        RaftSession session = this.raft.getSessions().getSession(indexed.entry().session());
        return session == null ? Futures.exceptionalFuture(new RaftException.UnknownSession("Unknown session: " + indexed.entry().session(), new Object[0])) : session.getService().closeSession(indexed.index(), indexed.entry().timestamp(), session, indexed.entry().expired());
    }

    private CompletableFuture<MetadataResult> applyMetadata(Indexed<MetadataEntry> indexed) {
        if (indexed.entry().session() <= 0) {
            HashSet hashSet = new HashSet();
            for (RaftSession raftSession : this.raft.getSessions().getSessions()) {
                hashSet.add(new SessionMetadata(raftSession.sessionId().id().longValue(), raftSession.serviceName(), raftSession.serviceType().id()));
            }
            return CompletableFuture.completedFuture(new MetadataResult(hashSet));
        }
        RaftSession session = this.raft.getSessions().getSession(indexed.entry().session());
        if (session == null) {
            this.logger.warn("Unknown session: " + indexed.entry().session());
            return Futures.exceptionalFuture(new RaftException.UnknownSession("Unknown session: " + indexed.entry().session(), new Object[0]));
        }
        HashSet hashSet2 = new HashSet();
        for (RaftSession raftSession2 : this.raft.getSessions().getSessions()) {
            if (raftSession2.serviceName().equals(session.serviceName())) {
                hashSet2.add(new SessionMetadata(raftSession2.sessionId().id().longValue(), raftSession2.serviceName(), raftSession2.serviceType().id()));
            }
        }
        return CompletableFuture.completedFuture(new MetadataResult(hashSet2));
    }

    private CompletableFuture<OperationResult> applyCommand(Indexed<CommandEntry> indexed) {
        RaftSession session = this.raft.getSessions().getSession(indexed.entry().session());
        if (session == null) {
            this.logger.warn("Unknown session: " + indexed.entry().session());
            return Futures.exceptionalFuture(new RaftException.UnknownSession("unknown session: " + indexed.entry().session(), new Object[0]));
        }
        this.raft.getLoadMonitor().recordEvent();
        return session.getService().executeCommand(indexed.index(), indexed.entry().sequenceNumber(), indexed.entry().timestamp(), session, indexed.entry().operation());
    }

    private CompletableFuture<OperationResult> applyQuery(Indexed<QueryEntry> indexed) {
        RaftSession session = this.raft.getSessions().getSession(indexed.entry().session());
        if (session != null) {
            return session.getService().executeQuery(indexed.index(), indexed.entry().sequenceNumber(), indexed.entry().timestamp(), session, indexed.entry().operation());
        }
        this.logger.warn("Unknown session: " + indexed.entry().session());
        return Futures.exceptionalFuture(new RaftException.UnknownSession("unknown session " + indexed.entry().session(), new Object[0]));
    }

    @Override // java.lang.AutoCloseable
    public void close() {
    }
}
