package io.atomix.protocols.log.roles;

import io.atomix.cluster.MemberId;
import io.atomix.protocols.log.impl.DistributedLogServerContext;
import io.atomix.protocols.log.protocol.BackupOperation;
import io.atomix.protocols.log.protocol.BackupRequest;
import io.atomix.protocols.log.protocol.LogResponse;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.slf4j.Logger;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:io/atomix/protocols/log/roles/SynchronousReplicator.class */
public class SynchronousReplicator implements Replicator {
    private final DistributedLogServerContext context;
    private final Logger log;
    private final Map<MemberId, BackupQueue> queues = new HashMap();
    private final Map<Long, CompletableFuture<Void>> futures = new LinkedHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/atomix/protocols/log/roles/SynchronousReplicator$BackupQueue.class */
    public final class BackupQueue {
        private final Queue<BackupOperation> operations = new LinkedList();
        private final MemberId memberId;
        private boolean inProgress;
        private long ackedIndex;

        BackupQueue(MemberId memberId) {
            this.memberId = memberId;
        }

        void add(BackupOperation backupOperation) {
            this.operations.add(backupOperation);
            maybeBackup();
        }

        private void maybeBackup() {
            if (this.inProgress || this.operations.isEmpty()) {
                return;
            }
            this.inProgress = true;
            backup();
        }

        private void backup() {
            long j;
            LinkedList linkedList = new LinkedList();
            long j2 = 0;
            while (true) {
                j = j2;
                if (linkedList.size() >= 100 || this.operations.isEmpty()) {
                    break;
                }
                BackupOperation remove = this.operations.remove();
                linkedList.add(remove);
                j2 = remove.index();
            }
            BackupRequest request = BackupRequest.request(SynchronousReplicator.this.context.memberId(), SynchronousReplicator.this.context.currentTerm(), SynchronousReplicator.this.context.getCommitIndex(), linkedList);
            SynchronousReplicator.this.log.trace("Sending {} to {}", request, this.memberId);
            SynchronousReplicator.this.context.protocol().backup(this.memberId, request).whenCompleteAsync((backupResponse, th) -> {
                if (th == null) {
                    SynchronousReplicator.this.log.trace("Received {} from {}", backupResponse, this.memberId);
                    if (backupResponse.status() == LogResponse.Status.OK) {
                        this.ackedIndex = j;
                        SynchronousReplicator.this.completeFutures();
                    } else {
                        SynchronousReplicator.this.log.trace("Replication to {} failed!", this.memberId);
                    }
                } else {
                    SynchronousReplicator.this.log.trace("Replication to {} failed! {}", this.memberId, th);
                }
                this.inProgress = false;
                maybeBackup();
            }, (Executor) SynchronousReplicator.this.context.threadContext());
            linkedList.clear();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SynchronousReplicator(DistributedLogServerContext distributedLogServerContext, Logger logger) {
        this.context = distributedLogServerContext;
        this.log = logger;
    }

    @Override // io.atomix.protocols.log.roles.Replicator
    public CompletableFuture<Void> replicate(BackupOperation backupOperation) {
        if (this.context.followers().isEmpty()) {
            this.context.setCommitIndex(backupOperation.index());
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.futures.put(Long.valueOf(backupOperation.index()), completableFuture);
        Iterator<MemberId> it = this.context.followers().iterator();
        while (it.hasNext()) {
            this.queues.computeIfAbsent(it.next(), memberId -> {
                return new BackupQueue(memberId);
            }).add(backupOperation);
        }
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void completeFutures() {
        long longValue = ((Long) this.queues.values().stream().map(backupQueue -> {
            return Long.valueOf(backupQueue.ackedIndex);
        }).reduce((v0, v1) -> {
            return Math.min(v0, v1);
        }).orElse(0L)).longValue();
        long commitIndex = this.context.getCommitIndex();
        while (true) {
            long j = commitIndex + 1;
            if (j > longValue) {
                this.context.setCommitIndex(longValue);
                return;
            }
            CompletableFuture<Void> remove = this.futures.remove(Long.valueOf(j));
            if (remove != null) {
                remove.complete(null);
            }
            commitIndex = j;
        }
    }

    @Override // io.atomix.protocols.log.roles.Replicator
    public void close() {
        this.futures.values().forEach(completableFuture -> {
            completableFuture.completeExceptionally(new IllegalStateException("Not the primary"));
        });
    }
}
