package org.apache.hadoop.ozone.container.common.transport.server.ratis;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.utils.Cache;
import org.apache.hadoop.hdds.utils.ResourceLimitCache;
import org.apache.hadoop.ozone.common.utils.BufferUtils;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
import org.apache.hadoop.util.Time;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.statemachine.StateMachineStorage;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.thirdparty.com.google.protobuf.TextFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.class */
public class ContainerStateMachine extends BaseStateMachine {
    static final Logger LOG = LoggerFactory.getLogger(ContainerStateMachine.class);
    private final RaftGroupId gid;
    private final ContainerDispatcher dispatcher;
    private final ContainerController containerController;
    private final XceiverServerRatis ratisServer;
    private final ExecutorService[] executors;
    private final List<ThreadPoolExecutor> chunkExecutors;
    private final Cache<Long, ByteString> stateMachineDataCache;
    private final AtomicBoolean stateMachineHealthy;
    private final Semaphore applyTransactionSemaphore;
    private final CSMMetrics metrics;
    private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage();
    private final ConcurrentHashMap<Long, CompletableFuture<ContainerProtos.ContainerCommandResponseProto>> writeChunkFutureMap = new ConcurrentHashMap<>();
    private final Map<Long, Long> applyTransactionCompletionMap = new ConcurrentHashMap();
    private final Map<Long, Long> container2BCSIDMap = new ConcurrentHashMap();

    /* renamed from: org.apache.hadoop.ozone.container.common.transport.server.ratis.ContainerStateMachine$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$hadoop$hdds$protocol$datanode$proto$ContainerProtos$Type = new int[ContainerProtos.Type.values().length];

        static {
            try {
                $SwitchMap$org$apache$hadoop$hdds$protocol$datanode$proto$ContainerProtos$Type[ContainerProtos.Type.WriteChunk.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
        }
    }

    public ContainerStateMachine(RaftGroupId raftGroupId, ContainerDispatcher containerDispatcher, ContainerController containerController, List<ThreadPoolExecutor> list, XceiverServerRatis xceiverServerRatis, ConfigurationSource configurationSource) {
        this.gid = raftGroupId;
        this.dispatcher = containerDispatcher;
        this.containerController = containerController;
        this.ratisServer = xceiverServerRatis;
        this.metrics = CSMMetrics.create(raftGroupId);
        this.stateMachineDataCache = new ResourceLimitCache(new ConcurrentHashMap(), (l, byteString) -> {
            return new int[]{1, byteString.size()};
        }, new int[]{((DatanodeRatisServerConfig) configurationSource.getObject(DatanodeRatisServerConfig.class)).getLeaderNumPendingRequests(), (int) configurationSource.getStorageSize("dfs.container.ratis.leader.pending.bytes.limit", "1GB", StorageUnit.BYTES)});
        this.chunkExecutors = list;
        int i = configurationSource.getInt("dfs.container.ratis.num.container.op.executors", 10);
        this.applyTransactionSemaphore = new Semaphore(configurationSource.getInt("dfs.container.ratis.statemachine.max.pending.apply-transactions", 100000));
        this.stateMachineHealthy = new AtomicBoolean(true);
        this.executors = new ExecutorService[i];
        for (int i2 = 0; i2 < i; i2++) {
            int i3 = i2;
            this.executors[i3] = Executors.newSingleThreadExecutor(runnable -> {
                Thread thread = new Thread(runnable);
                thread.setName("RatisApplyTransactionExecutor " + i3);
                return thread;
            });
        }
    }

    public StateMachineStorage getStateMachineStorage() {
        return this.storage;
    }

    public CSMMetrics getMetrics() {
        return this.metrics;
    }

    public void initialize(RaftServer raftServer, RaftGroupId raftGroupId, RaftStorage raftStorage) throws IOException {
        super.initialize(raftServer, raftGroupId, raftStorage);
        this.storage.init(raftStorage);
        this.ratisServer.notifyGroupAdd(this.gid);
        loadSnapshot(this.storage.getLatestSnapshot());
    }

    private long loadSnapshot(SingleFileSnapshotInfo singleFileSnapshotInfo) throws IOException {
        if (singleFileSnapshotInfo == null) {
            TermIndex valueOf = TermIndex.valueOf(0L, -1L);
            LOG.info("{}: The snapshot info is null. Setting the last applied indexto:{}", this.gid, valueOf);
            setLastAppliedTermIndex(valueOf);
            return valueOf.getIndex();
        }
        File file = singleFileSnapshotInfo.getFile().getPath().toFile();
        TermIndex termIndexFromSnapshotFile = SimpleStateMachineStorage.getTermIndexFromSnapshotFile(file);
        LOG.info("{}: Setting the last applied index to {}", this.gid, termIndexFromSnapshotFile);
        setLastAppliedTermIndex(termIndexFromSnapshotFile);
        buildMissingContainerSet(file);
        return termIndexFromSnapshotFile.getIndex();
    }

    @VisibleForTesting
    public void buildMissingContainerSet(File file) throws IOException {
        FileInputStream fileInputStream = new FileInputStream(file);
        Throwable th = null;
        try {
            try {
                this.container2BCSIDMap.putAll(ContainerProtos.Container2BCSIDMapProto.parseFrom(fileInputStream).getContainer2BCSIDMap());
                this.dispatcher.buildMissingContainerSetAndValidate(this.container2BCSIDMap);
                if (fileInputStream != null) {
                    if (0 == 0) {
                        fileInputStream.close();
                        return;
                    }
                    try {
                        fileInputStream.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (fileInputStream != null) {
                if (th != null) {
                    try {
                        fileInputStream.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    fileInputStream.close();
                }
            }
            throw th4;
        }
    }

    public void persistContainerSet(OutputStream outputStream) throws IOException {
        ContainerProtos.Container2BCSIDMapProto.Builder newBuilder = ContainerProtos.Container2BCSIDMapProto.newBuilder();
        newBuilder.putAllContainer2BCSID(this.container2BCSIDMap);
        newBuilder.build().writeTo(outputStream);
    }

    public boolean isStateMachineHealthy() {
        return this.stateMachineHealthy.get();
    }

    public long takeSnapshot() throws IOException {
        TermIndex lastAppliedTermIndex = getLastAppliedTermIndex();
        long monotonicNow = Time.monotonicNow();
        if (!isStateMachineHealthy()) {
            String str = "Failed to take snapshot  for " + this.gid + " as the stateMachine is unhealthy. The last applied index is at " + lastAppliedTermIndex;
            StateMachineException stateMachineException = new StateMachineException(str);
            LOG.error(str);
            throw stateMachineException;
        }
        if (lastAppliedTermIndex == null || lastAppliedTermIndex.getIndex() == -1) {
            return -1L;
        }
        File snapshotFile = this.storage.getSnapshotFile(lastAppliedTermIndex.getTerm(), lastAppliedTermIndex.getIndex());
        LOG.info("{}: Taking a snapshot at:{} file {}", new Object[]{this.gid, lastAppliedTermIndex, snapshotFile});
        try {
            FileOutputStream fileOutputStream = new FileOutputStream(snapshotFile);
            Throwable th = null;
            try {
                try {
                    persistContainerSet(fileOutputStream);
                    fileOutputStream.flush();
                    fileOutputStream.getFD().sync();
                    if (fileOutputStream != null) {
                        if (0 != 0) {
                            try {
                                fileOutputStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            fileOutputStream.close();
                        }
                    }
                    LOG.info("{}: Finished taking a snapshot at:{} file:{} took: {} ms", new Object[]{this.gid, lastAppliedTermIndex, snapshotFile, Long.valueOf(Time.monotonicNow() - monotonicNow)});
                    return lastAppliedTermIndex.getIndex();
                } finally {
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (IOException e) {
            LOG.error("{}: Failed to write snapshot at:{} file {}", new Object[]{this.gid, lastAppliedTermIndex, snapshotFile});
            throw e;
        }
    }

    public TransactionContext startTransaction(RaftClientRequest raftClientRequest) throws IOException {
        long monotonicNowNanos = Time.monotonicNowNanos();
        ContainerProtos.ContainerCommandRequestProto message2ContainerCommandRequestProto = message2ContainerCommandRequestProto(raftClientRequest.getMessage());
        Preconditions.checkArgument(raftClientRequest.getRaftGroupId().equals(this.gid));
        try {
            this.dispatcher.validateContainerCommand(message2ContainerCommandRequestProto);
            if (message2ContainerCommandRequestProto.getCmdType() != ContainerProtos.Type.WriteChunk) {
                return TransactionContext.newBuilder().setClientRequest(raftClientRequest).setStateMachine(this).setServerRole(RaftProtos.RaftPeerRole.LEADER).setStateMachineContext(Long.valueOf(monotonicNowNanos)).setLogData(message2ContainerCommandRequestProto.toByteString()).build();
            }
            ContainerProtos.WriteChunkRequestProto writeChunk = message2ContainerCommandRequestProto.getWriteChunk();
            return TransactionContext.newBuilder().setClientRequest(raftClientRequest).setStateMachine(this).setServerRole(RaftProtos.RaftPeerRole.LEADER).setStateMachineContext(Long.valueOf(monotonicNowNanos)).setStateMachineData(writeChunk.getData()).setLogData(ContainerProtos.ContainerCommandRequestProto.newBuilder(message2ContainerCommandRequestProto).setWriteChunk(ContainerProtos.WriteChunkRequestProto.newBuilder().setBlockID(writeChunk.getBlockID()).setChunkData(writeChunk.getChunkData()).build()).setTraceID(message2ContainerCommandRequestProto.getTraceID()).build().toByteString()).build();
        } catch (IOException e) {
            if (e instanceof ContainerNotOpenException) {
                this.metrics.incNumContainerNotOpenVerifyFailures();
            } else {
                this.metrics.incNumStartTransactionVerifyFailures();
                LOG.error("startTransaction validation failed on leader", e);
            }
            TransactionContext build = TransactionContext.newBuilder().setClientRequest(raftClientRequest).setStateMachine(this).setServerRole(RaftProtos.RaftPeerRole.LEADER).build();
            build.setException(e);
            return build;
        }
    }

    private ByteString getStateMachineData(RaftProtos.StateMachineLogEntryProto stateMachineLogEntryProto) {
        return stateMachineLogEntryProto.getStateMachineEntry().getStateMachineData();
    }

    private static ContainerProtos.ContainerCommandRequestProto getContainerCommandRequestProto(RaftGroupId raftGroupId, ByteString byteString) throws InvalidProtocolBufferException {
        return ContainerProtos.ContainerCommandRequestProto.newBuilder(ContainerProtos.ContainerCommandRequestProto.parseFrom(byteString)).setPipelineID(raftGroupId.getUuid().toString()).build();
    }

    private ContainerProtos.ContainerCommandRequestProto message2ContainerCommandRequestProto(Message message) throws InvalidProtocolBufferException {
        return ContainerCommandRequestMessage.toProto(message.getContent(), this.gid);
    }

    private ContainerProtos.ContainerCommandResponseProto dispatchCommand(ContainerProtos.ContainerCommandRequestProto containerCommandRequestProto, DispatcherContext dispatcherContext) {
        if (LOG.isTraceEnabled()) {
            LOG.trace("{}: dispatch {} containerID={} pipelineID={} traceID={}", new Object[]{this.gid, containerCommandRequestProto.getCmdType(), Long.valueOf(containerCommandRequestProto.getContainerID()), containerCommandRequestProto.getPipelineID(), containerCommandRequestProto.getTraceID()});
        }
        ContainerProtos.ContainerCommandResponseProto dispatch = this.dispatcher.dispatch(containerCommandRequestProto, dispatcherContext);
        if (LOG.isTraceEnabled()) {
            LOG.trace("{}: response {}", this.gid, dispatch);
        }
        return dispatch;
    }

    private ContainerProtos.ContainerCommandResponseProto runCommand(ContainerProtos.ContainerCommandRequestProto containerCommandRequestProto, DispatcherContext dispatcherContext) {
        return dispatchCommand(containerCommandRequestProto, dispatcherContext);
    }

    private ExecutorService getCommandExecutor(ContainerProtos.ContainerCommandRequestProto containerCommandRequestProto) {
        return this.executors[(int) (containerCommandRequestProto.getContainerID() % this.executors.length)];
    }

    private CompletableFuture<Message> handleWriteChunk(ContainerProtos.ContainerCommandRequestProto containerCommandRequestProto, long j, long j2, long j3) {
        ContainerProtos.WriteChunkRequestProto writeChunk = containerCommandRequestProto.getWriteChunk();
        try {
            if (this.ratisServer.getServerDivision().getInfo().isLeader()) {
                this.stateMachineDataCache.put(Long.valueOf(j), writeChunk.getData());
            }
            DispatcherContext build = new DispatcherContext.Builder().setTerm(j2).setLogIndex(j).setStage(DispatcherContext.WriteChunkStage.WRITE_DATA).setContainer2BCSIDMap(this.container2BCSIDMap).build();
            CompletableFuture<Message> completableFuture = new CompletableFuture<>();
            CompletableFuture<ContainerProtos.ContainerCommandResponseProto> supplyAsync = CompletableFuture.supplyAsync(() -> {
                try {
                    return runCommand(containerCommandRequestProto, build);
                } catch (Exception e) {
                    LOG.error("{}: writeChunk writeStateMachineData failed: blockId{} logIndex {} chunkName {}", new Object[]{this.gid, writeChunk.getBlockID(), Long.valueOf(j), writeChunk.getChunkData().getChunkName(), e});
                    this.metrics.incNumWriteDataFails();
                    this.stateMachineHealthy.set(false);
                    completableFuture.completeExceptionally(e);
                    throw e;
                }
            }, getChunkExecutor(containerCommandRequestProto.getWriteChunk()));
            this.writeChunkFutureMap.put(Long.valueOf(j), supplyAsync);
            if (LOG.isDebugEnabled()) {
                LOG.debug("{}: writeChunk writeStateMachineData : blockId{} logIndex {} chunkName {}", new Object[]{this.gid, writeChunk.getBlockID(), Long.valueOf(j), writeChunk.getChunkData().getChunkName()});
            }
            supplyAsync.thenApply(containerCommandResponseProto -> {
                if (containerCommandResponseProto.getResult() == ContainerProtos.Result.SUCCESS || containerCommandResponseProto.getResult() == ContainerProtos.Result.CONTAINER_NOT_OPEN || containerCommandResponseProto.getResult() == ContainerProtos.Result.CLOSED_CONTAINER_IO) {
                    this.metrics.incNumBytesWrittenCount(containerCommandRequestProto.getWriteChunk().getChunkData().getLen());
                    if (LOG.isDebugEnabled()) {
                        LOG.debug(this.gid + ": writeChunk writeStateMachineData  completed: blockId" + writeChunk.getBlockID() + " logIndex " + j + " chunkName " + writeChunk.getChunkData().getChunkName());
                    }
                    containerCommandResponseProto.getClass();
                    completableFuture.complete(containerCommandResponseProto::toByteString);
                    this.metrics.recordWriteStateMachineCompletion(Time.monotonicNowNanos() - j3);
                } else {
                    StorageContainerException storageContainerException = new StorageContainerException(containerCommandResponseProto.getMessage(), containerCommandResponseProto.getResult());
                    LOG.error(this.gid + ": writeChunk writeStateMachineData failed: blockId" + writeChunk.getBlockID() + " logIndex " + j + " chunkName " + writeChunk.getChunkData().getChunkName() + " Error message: " + containerCommandResponseProto.getMessage() + " Container Result: " + containerCommandResponseProto.getResult());
                    this.metrics.incNumWriteDataFails();
                    this.stateMachineHealthy.set(false);
                    completableFuture.completeExceptionally(storageContainerException);
                }
                this.writeChunkFutureMap.remove(Long.valueOf(j));
                return containerCommandResponseProto;
            });
            return completableFuture;
        } catch (IOException e) {
            return completeExceptionally(e);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            return completeExceptionally(e2);
        }
    }

    private ExecutorService getChunkExecutor(ContainerProtos.WriteChunkRequestProto writeChunkRequestProto) {
        int hashCode = Objects.hashCode(writeChunkRequestProto.getBlockID());
        if (hashCode == Integer.MIN_VALUE) {
            hashCode = Integer.MAX_VALUE;
        }
        return this.chunkExecutors.get(Math.abs(hashCode) % this.chunkExecutors.size());
    }

    public CompletableFuture<Message> write(RaftProtos.LogEntryProto logEntryProto) {
        try {
            this.metrics.incNumWriteStateMachineOps();
            long monotonicNowNanos = Time.monotonicNowNanos();
            ContainerProtos.ContainerCommandRequestProto containerCommandRequestProto = getContainerCommandRequestProto(this.gid, logEntryProto.getStateMachineLogEntry().getLogData());
            ContainerProtos.ContainerCommandRequestProto build = ContainerProtos.ContainerCommandRequestProto.newBuilder(containerCommandRequestProto).setWriteChunk(ContainerProtos.WriteChunkRequestProto.newBuilder(containerCommandRequestProto.getWriteChunk()).setData(getStateMachineData(logEntryProto.getStateMachineLogEntry())).build()).build();
            ContainerProtos.Type cmdType = build.getCmdType();
            switch (AnonymousClass1.$SwitchMap$org$apache$hadoop$hdds$protocol$datanode$proto$ContainerProtos$Type[cmdType.ordinal()]) {
                case 1:
                    return handleWriteChunk(build, logEntryProto.getIndex(), logEntryProto.getTerm(), monotonicNowNanos);
                default:
                    throw new IllegalStateException("Cmd Type:" + cmdType + " should not have state machine data");
            }
        } catch (IOException e) {
            this.metrics.incNumWriteStateMachineFails();
            return completeExceptionally(e);
        }
    }

    public CompletableFuture<Message> query(Message message) {
        try {
            this.metrics.incNumQueryStateMachineOps();
            ContainerProtos.ContainerCommandResponseProto runCommand = runCommand(message2ContainerCommandRequestProto(message), null);
            runCommand.getClass();
            return CompletableFuture.completedFuture(runCommand::toByteString);
        } catch (IOException e) {
            this.metrics.incNumQueryStateMachineFails();
            return completeExceptionally(e);
        }
    }

    private ByteString readStateMachineData(ContainerProtos.ContainerCommandRequestProto containerCommandRequestProto, long j, long j2) throws IOException {
        this.metrics.incNumReadStateMachineMissCount();
        ContainerProtos.WriteChunkRequestProto writeChunk = containerCommandRequestProto.getWriteChunk();
        ContainerProtos.ChunkInfo chunkData = writeChunk.getChunkData();
        ContainerProtos.ContainerCommandResponseProto dispatchCommand = dispatchCommand(ContainerProtos.ContainerCommandRequestProto.newBuilder(containerCommandRequestProto).setCmdType(ContainerProtos.Type.ReadChunk).setReadChunk(ContainerProtos.ReadChunkRequestProto.newBuilder().setBlockID(writeChunk.getBlockID()).setChunkData(chunkData).setReadChunkVersion(ContainerProtos.ReadChunkVersion.V1)).build(), new DispatcherContext.Builder().setTerm(j).setLogIndex(j2).setReadFromTmpFile(true).build());
        if (dispatchCommand.getResult() != ContainerProtos.Result.SUCCESS) {
            StorageContainerException storageContainerException = new StorageContainerException(dispatchCommand.getMessage(), dispatchCommand.getResult());
            LOG.error("gid {} : ReadStateMachine failed. cmd {} logIndex {} msg : {} Container Result: {}", new Object[]{this.gid, dispatchCommand.getCmdType(), Long.valueOf(j2), dispatchCommand.getMessage(), dispatchCommand.getResult()});
            this.stateMachineHealthy.set(false);
            throw storageContainerException;
        }
        ContainerProtos.ReadChunkResponseProto readChunk = dispatchCommand.getReadChunk();
        ByteString data = readChunk.hasData() ? readChunk.getData() : BufferUtils.concatByteStrings(readChunk.getDataBuffers().getBuffersList());
        Preconditions.checkNotNull(data, "read chunk data is null for chunk: %s", chunkData);
        Preconditions.checkState(((long) data.size()) == chunkData.getLen(), "read chunk len=%s does not match chunk expected len=%s for chunk:%s", Integer.valueOf(data.size()), Long.valueOf(chunkData.getLen()), chunkData);
        return data;
    }

    public CompletableFuture<Void> flush(long j) {
        List list = (List) this.writeChunkFutureMap.entrySet().stream().filter(entry -> {
            return ((Long) entry.getKey()).longValue() <= j;
        }).map((v0) -> {
            return v0.getValue();
        }).collect(Collectors.toList());
        return CompletableFuture.allOf((CompletableFuture[]) list.toArray(new CompletableFuture[list.size()]));
    }

    public CompletableFuture<ByteString> read(RaftProtos.LogEntryProto logEntryProto) {
        RaftProtos.StateMachineLogEntryProto stateMachineLogEntry = logEntryProto.getStateMachineLogEntry();
        this.metrics.incNumReadStateMachineOps();
        if (!getStateMachineData(stateMachineLogEntry).isEmpty()) {
            return CompletableFuture.completedFuture(ByteString.EMPTY);
        }
        try {
            ContainerProtos.ContainerCommandRequestProto containerCommandRequestProto = getContainerCommandRequestProto(this.gid, logEntryProto.getStateMachineLogEntry().getLogData());
            Preconditions.checkArgument(!HddsUtils.isReadOnly(containerCommandRequestProto));
            if (containerCommandRequestProto.getCmdType() != ContainerProtos.Type.WriteChunk) {
                throw new IllegalStateException("Cmd type:" + containerCommandRequestProto.getCmdType() + " cannot have state machine data");
            }
            CompletableFuture<ByteString> completableFuture = new CompletableFuture<>();
            ByteString byteString = (ByteString) this.stateMachineDataCache.get(Long.valueOf(logEntryProto.getIndex()));
            if (byteString != null) {
                completableFuture.complete(byteString);
                return completableFuture;
            }
            CompletableFuture.supplyAsync(() -> {
                try {
                    completableFuture.complete(readStateMachineData(containerCommandRequestProto, logEntryProto.getTerm(), logEntryProto.getIndex()));
                } catch (IOException e) {
                    this.metrics.incNumReadStateMachineFails();
                    completableFuture.completeExceptionally(e);
                }
                return completableFuture;
            }, getChunkExecutor(containerCommandRequestProto.getWriteChunk()));
            return completableFuture;
        } catch (Exception e) {
            this.metrics.incNumReadStateMachineFails();
            LOG.error("{} unable to read stateMachineData:", this.gid, e);
            return completeExceptionally(e);
        }
    }

    private synchronized void updateLastApplied() {
        Long l = null;
        long j = -1;
        long index = getLastAppliedTermIndex().getIndex();
        while (true) {
            long j2 = index + 1;
            Long remove = this.applyTransactionCompletionMap.remove(Long.valueOf(j2));
            if (remove == null) {
                break;
            }
            l = remove;
            j = j2;
            index = j2;
        }
        if (l != null) {
            updateLastAppliedTermIndex(l.longValue(), j);
        }
    }

    public void notifyTermIndexUpdated(long j, long j2) {
        this.applyTransactionCompletionMap.put(Long.valueOf(j2), Long.valueOf(j));
        updateLastApplied();
    }

    public CompletableFuture<Message> applyTransaction(TransactionContext transactionContext) {
        long index = transactionContext.getLogEntry().getIndex();
        this.stateMachineDataCache.remove(Long.valueOf(index));
        DispatcherContext.Builder logIndex = new DispatcherContext.Builder().setTerm(transactionContext.getLogEntry().getTerm()).setLogIndex(index);
        long monotonicNowNanos = Time.monotonicNowNanos();
        try {
            this.applyTransactionSemaphore.acquire();
            this.metrics.incNumApplyTransactionsOps();
            ContainerProtos.ContainerCommandRequestProto containerCommandRequestProto = getContainerCommandRequestProto(this.gid, transactionContext.getStateMachineLogEntry().getLogData());
            ContainerProtos.Type cmdType = containerCommandRequestProto.getCmdType();
            if (cmdType == ContainerProtos.Type.WriteChunk) {
                Preconditions.checkArgument(containerCommandRequestProto.getWriteChunk().getData().isEmpty());
                logIndex.setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA);
            }
            if (cmdType == ContainerProtos.Type.WriteChunk || cmdType == ContainerProtos.Type.PutSmallFile || cmdType == ContainerProtos.Type.PutBlock || cmdType == ContainerProtos.Type.CreateContainer) {
                logIndex.setContainer2BCSIDMap(this.container2BCSIDMap);
            }
            CompletableFuture<Message> completableFuture = new CompletableFuture<>();
            CompletableFuture.supplyAsync(() -> {
                try {
                    return runCommand(containerCommandRequestProto, logIndex.build());
                } catch (Exception e) {
                    LOG.error("gid {} : ApplyTransaction failed. cmd {} logIndex {} exception {}", new Object[]{this.gid, containerCommandRequestProto.getCmdType(), Long.valueOf(index), e});
                    this.stateMachineHealthy.compareAndSet(true, false);
                    this.metrics.incNumApplyTransactionsFails();
                    completableFuture.completeExceptionally(e);
                    throw e;
                }
            }, getCommandExecutor(containerCommandRequestProto)).thenApply(containerCommandResponseProto -> {
                if (transactionContext.getServerRole() == RaftProtos.RaftPeerRole.LEADER && transactionContext.getStateMachineContext() != null) {
                    this.metrics.incPipelineLatency(cmdType, (Time.monotonicNowNanos() - ((Long) transactionContext.getStateMachineContext()).longValue()) / 1000000);
                }
                if (containerCommandResponseProto.getResult() == ContainerProtos.Result.SUCCESS || containerCommandResponseProto.getResult() == ContainerProtos.Result.CONTAINER_NOT_OPEN || containerCommandResponseProto.getResult() == ContainerProtos.Result.CLOSED_CONTAINER_IO) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("gid {} : ApplyTransaction completed. cmd {} logIndex {} msg : {} Container Result: {}", new Object[]{this.gid, containerCommandResponseProto.getCmdType(), Long.valueOf(index), containerCommandResponseProto.getMessage(), containerCommandResponseProto.getResult()});
                    }
                    containerCommandResponseProto.getClass();
                    completableFuture.complete(containerCommandResponseProto::toByteString);
                    if (cmdType == ContainerProtos.Type.WriteChunk || cmdType == ContainerProtos.Type.PutSmallFile) {
                        this.metrics.incNumBytesCommittedCount(containerCommandRequestProto.getWriteChunk().getChunkData().getLen());
                    }
                    if (isStateMachineHealthy()) {
                        Preconditions.checkState(this.applyTransactionCompletionMap.put(Long.valueOf(index), Long.valueOf(transactionContext.getLogEntry().getTerm())) == null);
                        updateLastApplied();
                    }
                } else {
                    StorageContainerException storageContainerException = new StorageContainerException(containerCommandResponseProto.getMessage(), containerCommandResponseProto.getResult());
                    LOG.error("gid {} : ApplyTransaction failed. cmd {} logIndex {} msg : {} Container Result: {}", new Object[]{this.gid, containerCommandResponseProto.getCmdType(), Long.valueOf(index), containerCommandResponseProto.getMessage(), containerCommandResponseProto.getResult()});
                    this.metrics.incNumApplyTransactionsFails();
                    completableFuture.completeExceptionally(storageContainerException);
                    this.stateMachineHealthy.compareAndSet(true, false);
                    this.ratisServer.handleApplyTransactionFailure(this.gid, transactionContext.getServerRole());
                }
                return completableFuture;
            }).whenComplete((completableFuture2, th) -> {
                if (th != null) {
                    this.stateMachineHealthy.set(false);
                    LOG.error("gid {} : ApplyTransaction failed. cmd {} logIndex {} exception {}", new Object[]{this.gid, containerCommandRequestProto.getCmdType(), Long.valueOf(index), th});
                }
                this.applyTransactionSemaphore.release();
                this.metrics.recordApplyTransactionCompletion(Time.monotonicNowNanos() - monotonicNowNanos);
            });
            return completableFuture;
        } catch (IOException e) {
            this.metrics.incNumApplyTransactionsFails();
            return completeExceptionally(e);
        } catch (InterruptedException e2) {
            this.metrics.incNumApplyTransactionsFails();
            Thread.currentThread().interrupt();
            return completeExceptionally(e2);
        }
    }

    private static <T> CompletableFuture<T> completeExceptionally(Exception exc) {
        CompletableFuture<T> completableFuture = new CompletableFuture<>();
        completableFuture.completeExceptionally(exc);
        return completableFuture;
    }

    public CompletableFuture<Void> truncate(long j) {
        this.stateMachineDataCache.removeIf(l -> {
            return l.longValue() >= j;
        });
        return CompletableFuture.completedFuture(null);
    }

    @VisibleForTesting
    public void evictStateMachineCache() {
        this.stateMachineDataCache.clear();
    }

    public void notifyFollowerSlowness(RaftProtos.RoleInfoProto roleInfoProto) {
        this.ratisServer.handleNodeSlowness(this.gid, roleInfoProto);
    }

    public void notifyExtendedNoLeader(RaftProtos.RoleInfoProto roleInfoProto) {
        this.ratisServer.handleNoLeader(this.gid, roleInfoProto);
    }

    public void notifyLogFailed(Throwable th, RaftProtos.LogEntryProto logEntryProto) {
        this.ratisServer.handleNodeLogFailure(this.gid, th);
    }

    public CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader(RaftProtos.RoleInfoProto roleInfoProto, TermIndex termIndex) {
        this.ratisServer.handleInstallSnapshotFromLeader(this.gid, roleInfoProto, termIndex);
        CompletableFuture<TermIndex> completableFuture = new CompletableFuture<>();
        completableFuture.complete(termIndex);
        return completableFuture;
    }

    public void notifyGroupRemove() {
        this.ratisServer.notifyGroupRemove(this.gid);
        for (Long l : this.container2BCSIDMap.keySet()) {
            try {
                this.containerController.markContainerForClose(l.longValue());
                this.containerController.quasiCloseContainer(l.longValue());
            } catch (IOException e) {
                LOG.debug("Failed to quasi-close container {}", l);
            }
        }
    }

    public void close() throws IOException {
        evictStateMachineCache();
        for (ExecutorService executorService : this.executors) {
            executorService.shutdown();
        }
        this.metrics.unRegister();
    }

    public void notifyLeaderChanged(RaftGroupMemberId raftGroupMemberId, RaftPeerId raftPeerId) {
        this.ratisServer.handleLeaderChangedNotification(raftGroupMemberId, raftPeerId);
    }

    public String toStateMachineLogEntryString(RaftProtos.StateMachineLogEntryProto stateMachineLogEntryProto) {
        return smProtoToString(this.gid, this.containerController, stateMachineLogEntryProto);
    }

    public static String smProtoToString(RaftGroupId raftGroupId, ContainerController containerController, RaftProtos.StateMachineLogEntryProto stateMachineLogEntryProto) {
        StringBuilder sb = new StringBuilder();
        try {
            ContainerProtos.ContainerCommandRequestProto containerCommandRequestProto = getContainerCommandRequestProto(raftGroupId, stateMachineLogEntryProto.getLogData());
            long containerID = containerCommandRequestProto.getContainerID();
            sb.append(TextFormat.shortDebugString(containerCommandRequestProto));
            if (containerController != null) {
                String containerLocation = containerController.getContainerLocation(containerID);
                sb.append(", container path=");
                sb.append(containerLocation);
            }
        } catch (Exception e) {
            LOG.info("smProtoToString failed", e);
            sb.append("smProtoToString failed with");
            sb.append(e.getMessage());
        }
        return sb.toString();
    }
}
