package io.atomix.protocols.log.impl;

import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import io.atomix.cluster.ClusterMembershipEvent;
import io.atomix.cluster.ClusterMembershipEventListener;
import io.atomix.cluster.ClusterMembershipService;
import io.atomix.cluster.Member;
import io.atomix.cluster.MemberId;
import io.atomix.primitive.PrimitiveException;
import io.atomix.primitive.PrimitiveState;
import io.atomix.primitive.log.LogConsumer;
import io.atomix.primitive.log.LogProducer;
import io.atomix.primitive.log.LogRecord;
import io.atomix.primitive.log.LogSession;
import io.atomix.primitive.partition.PartitionId;
import io.atomix.primitive.partition.PrimaryElection;
import io.atomix.primitive.partition.PrimaryElectionEventListener;
import io.atomix.primitive.partition.PrimaryTerm;
import io.atomix.primitive.session.SessionId;
import io.atomix.protocols.log.protocol.AppendRequest;
import io.atomix.protocols.log.protocol.ConsumeRequest;
import io.atomix.protocols.log.protocol.LogClientProtocol;
import io.atomix.protocols.log.protocol.LogResponse;
import io.atomix.protocols.log.protocol.RecordsRequest;
import io.atomix.protocols.log.protocol.ResetRequest;
import io.atomix.utils.concurrent.ThreadContext;
import io.atomix.utils.logging.ContextualLoggerFactory;
import io.atomix.utils.logging.LoggerContext;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import org.slf4j.Logger;

/* loaded from: input_file:io/atomix/protocols/log/impl/DistributedLogSession.class */
public class DistributedLogSession implements LogSession {
    private final PartitionId partitionId;
    private final SessionId sessionId;
    private final LogClientProtocol protocol;
    private final PrimaryElection primaryElection;
    private final ThreadContext threadContext;
    private final MemberId memberId;
    private final String subject;
    private PrimaryTerm term;
    private final Logger log;
    private final Set<Consumer<PrimitiveState>> stateChangeListeners = Sets.newIdentityHashSet();
    private final ClusterMembershipEventListener membershipEventListener = this::handleClusterEvent;
    private final PrimaryElectionEventListener primaryElectionListener = primaryElectionEvent -> {
        changeReplicas(primaryElectionEvent.term());
    };
    private final DistributedLogProducer producer = new DistributedLogProducer();
    private final DistributedLogConsumer consumer = new DistributedLogConsumer();
    private volatile PrimitiveState state = PrimitiveState.CONNECTED;

    /* loaded from: input_file:io/atomix/protocols/log/impl/DistributedLogSession$DistributedLogConsumer.class */
    private class DistributedLogConsumer implements LogConsumer {
        private MemberId leader;
        private long index;
        private volatile Consumer<LogRecord> consumer;

        private DistributedLogConsumer() {
        }

        /* JADX INFO: Access modifiers changed from: private */
        public CompletableFuture<Void> register(MemberId memberId) {
            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            this.leader = memberId;
            DistributedLogSession.this.protocol.consume(memberId, ConsumeRequest.request(DistributedLogSession.this.memberId, DistributedLogSession.this.subject, this.index + 1)).whenCompleteAsync((consumeResponse, th) -> {
                if (th != null) {
                    completableFuture.completeExceptionally(th);
                } else if (consumeResponse.status() == LogResponse.Status.OK) {
                    completableFuture.complete(null);
                } else {
                    completableFuture.completeExceptionally(new PrimitiveException.Unavailable());
                }
            }, (Executor) DistributedLogSession.this.threadContext);
            return completableFuture;
        }

        private void handleRecords(RecordsRequest recordsRequest) {
            if (recordsRequest.reset()) {
                this.index = recordsRequest.record().index() - 1;
            }
            if (recordsRequest.record().index() != this.index + 1) {
                DistributedLogSession.this.protocol.reset(this.leader, ResetRequest.request(DistributedLogSession.this.memberId, DistributedLogSession.this.subject, this.index + 1));
                return;
            }
            Consumer<LogRecord> consumer = this.consumer;
            if (consumer != null) {
                consumer.accept(recordsRequest.record());
                this.index = recordsRequest.record().index();
            }
        }

        public CompletableFuture<Void> consume(long j, Consumer<LogRecord> consumer) {
            return DistributedLogSession.this.term().thenCompose(primaryTerm -> {
                DistributedLogSession.this.protocol.registerRecordsConsumer(DistributedLogSession.this.subject, this::handleRecords, DistributedLogSession.this.threadContext);
                this.consumer = consumer;
                this.index = j - 1;
                return register(primaryTerm.primary().memberId());
            });
        }
    }

    /* loaded from: input_file:io/atomix/protocols/log/impl/DistributedLogSession$DistributedLogProducer.class */
    private class DistributedLogProducer implements LogProducer {
        private DistributedLogProducer() {
        }

        public CompletableFuture<Long> append(byte[] bArr) {
            CompletableFuture<Long> completableFuture = new CompletableFuture<>();
            DistributedLogSession.this.term().thenCompose(primaryTerm -> {
                return DistributedLogSession.this.protocol.append(primaryTerm.primary().memberId(), AppendRequest.request(bArr));
            }).whenCompleteAsync((appendResponse, th) -> {
                if (th != null) {
                    completableFuture.completeExceptionally(th);
                } else if (appendResponse.status() == LogResponse.Status.OK) {
                    completableFuture.complete(Long.valueOf(appendResponse.index()));
                } else {
                    completableFuture.completeExceptionally(new PrimitiveException.Unavailable());
                }
            }, (Executor) DistributedLogSession.this.threadContext);
            return completableFuture;
        }
    }

    public DistributedLogSession(PartitionId partitionId, SessionId sessionId, ClusterMembershipService clusterMembershipService, LogClientProtocol logClientProtocol, PrimaryElection primaryElection, ThreadContext threadContext) {
        this.partitionId = (PartitionId) Preconditions.checkNotNull(partitionId, "partitionId cannot be null");
        this.sessionId = (SessionId) Preconditions.checkNotNull(sessionId, "sessionId cannot be null");
        this.protocol = (LogClientProtocol) Preconditions.checkNotNull(logClientProtocol, "protocol cannot be null");
        this.primaryElection = (PrimaryElection) Preconditions.checkNotNull(primaryElection, "primaryElection cannot be null");
        this.threadContext = (ThreadContext) Preconditions.checkNotNull(threadContext, "threadContext cannot be null");
        this.memberId = clusterMembershipService.getLocalMember().id();
        this.subject = String.format("%s-%s-%s", partitionId.group(), partitionId.id(), sessionId);
        clusterMembershipService.addListener(this.membershipEventListener);
        primaryElection.addListener(this.primaryElectionListener);
        this.log = ContextualLoggerFactory.getLogger(getClass(), LoggerContext.builder(DistributedLogProducer.class).addValue(partitionId.group() != null ? String.format("%s-%d", partitionId.group(), partitionId.id()) : partitionId.id()).build());
    }

    public PartitionId partitionId() {
        return this.partitionId;
    }

    public SessionId sessionId() {
        return this.sessionId;
    }

    public ThreadContext context() {
        return this.threadContext;
    }

    public PrimitiveState getState() {
        return this.state;
    }

    public LogProducer producer() {
        return this.producer;
    }

    public LogConsumer consumer() {
        return this.consumer;
    }

    private void handleClusterEvent(ClusterMembershipEvent clusterMembershipEvent) {
        PrimaryTerm primaryTerm = this.term;
        if (primaryTerm != null && clusterMembershipEvent.type() == ClusterMembershipEvent.Type.MEMBER_REMOVED && ((Member) clusterMembershipEvent.subject()).id().equals(primaryTerm.primary().memberId())) {
            changeState(PrimitiveState.SUSPENDED);
        }
    }

    private void changeReplicas(PrimaryTerm primaryTerm) {
        this.threadContext.execute(() -> {
            if (this.term == null || primaryTerm.term() > this.term.term()) {
                this.term = primaryTerm;
                this.consumer.register(primaryTerm.primary().memberId());
            }
        });
    }

    private synchronized void changeState(PrimitiveState primitiveState) {
        if (this.state != primitiveState) {
            this.state = primitiveState;
            this.stateChangeListeners.forEach(consumer -> {
                this.threadContext.execute(() -> {
                    consumer.accept(primitiveState);
                });
            });
        }
    }

    public void addStateChangeListener(Consumer<PrimitiveState> consumer) {
        this.stateChangeListeners.add((Consumer) Preconditions.checkNotNull(consumer));
    }

    public void removeStateChangeListener(Consumer<PrimitiveState> consumer) {
        this.stateChangeListeners.remove(Preconditions.checkNotNull(consumer));
    }

    public CompletableFuture<LogSession> connect() {
        return term().thenRun(() -> {
            changeState(PrimitiveState.CONNECTED);
        }).thenApply(r3 -> {
            return this;
        });
    }

    public CompletableFuture<Void> close() {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.threadContext.execute(() -> {
            changeState(PrimitiveState.CLOSED);
            completableFuture.complete(null);
        });
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public CompletableFuture<PrimaryTerm> term() {
        CompletableFuture<PrimaryTerm> completableFuture = new CompletableFuture<>();
        this.threadContext.execute(() -> {
            if (this.term != null) {
                completableFuture.complete(this.term);
            } else {
                this.primaryElection.getTerm().whenCompleteAsync((primaryTerm, th) -> {
                    if (primaryTerm == null) {
                        completableFuture.completeExceptionally(new PrimitiveException.Unavailable());
                    } else {
                        this.term = primaryTerm;
                        completableFuture.complete(primaryTerm);
                    }
                });
            }
        });
        return completableFuture;
    }
}
