package io.atomix.protocols.backup.proxy;

import ch.qos.logback.core.joran.action.Action;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import io.atomix.cluster.ClusterEvent;
import io.atomix.cluster.ClusterEventListener;
import io.atomix.cluster.ClusterService;
import io.atomix.primitive.PrimitiveException;
import io.atomix.primitive.PrimitiveType;
import io.atomix.primitive.event.PrimitiveEvent;
import io.atomix.primitive.operation.PrimitiveOperation;
import io.atomix.primitive.partition.PrimaryElection;
import io.atomix.primitive.partition.PrimaryElectionEventListener;
import io.atomix.primitive.partition.PrimaryTerm;
import io.atomix.primitive.proxy.PrimitiveProxy;
import io.atomix.primitive.proxy.impl.AbstractPrimitiveProxy;
import io.atomix.primitive.session.SessionId;
import io.atomix.protocols.backup.protocol.CloseRequest;
import io.atomix.protocols.backup.protocol.ExecuteRequest;
import io.atomix.protocols.backup.protocol.PrimaryBackupClientProtocol;
import io.atomix.protocols.backup.protocol.PrimaryBackupResponse;
import io.atomix.protocols.backup.protocol.PrimitiveDescriptor;
import io.atomix.utils.concurrent.ComposableFuture;
import io.atomix.utils.concurrent.ThreadContext;
import io.atomix.utils.logging.ContextualLoggerFactory;
import io.atomix.utils.logging.LoggerContext;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import javax.ws.rs.core.Link;
import org.slf4j.Logger;

/* loaded from: input_file:io/atomix/protocols/backup/proxy/PrimaryBackupProxy.class */
public class PrimaryBackupProxy extends AbstractPrimitiveProxy {
    private Logger log;
    private final PrimitiveType primitiveType;
    private final PrimitiveDescriptor descriptor;
    private final ClusterService clusterService;
    private final PrimaryBackupClientProtocol protocol;
    private final SessionId sessionId;
    private final PrimaryElection primaryElection;
    private final ThreadContext threadContext;
    private PrimaryTerm term;
    private final Set<Consumer<PrimitiveProxy.State>> stateChangeListeners = Sets.newIdentityHashSet();
    private final Set<Consumer<PrimitiveEvent>> eventListeners = Sets.newIdentityHashSet();
    private final PrimaryElectionEventListener primaryElectionListener = primaryElectionEvent -> {
        changeReplicas(primaryElectionEvent.term());
    };
    private final ClusterEventListener clusterEventListener = this::handleClusterEvent;
    private volatile PrimitiveProxy.State state = PrimitiveProxy.State.CLOSED;

    public PrimaryBackupProxy(String str, SessionId sessionId, PrimitiveType primitiveType, PrimitiveDescriptor primitiveDescriptor, ClusterService clusterService, PrimaryBackupClientProtocol primaryBackupClientProtocol, PrimaryElection primaryElection, ThreadContext threadContext) {
        this.sessionId = (SessionId) Preconditions.checkNotNull(sessionId);
        this.primitiveType = primitiveType;
        this.descriptor = primitiveDescriptor;
        this.clusterService = clusterService;
        this.protocol = primaryBackupClientProtocol;
        this.primaryElection = primaryElection;
        this.threadContext = threadContext;
        primaryElection.addListener(this.primaryElectionListener);
        this.log = ContextualLoggerFactory.getLogger(getClass(), LoggerContext.builder(PrimitiveProxy.class).addValue(str).add(Link.TYPE, primitiveType.id()).add(Action.NAME_ATTRIBUTE, primitiveDescriptor.name()).build2());
    }

    @Override // io.atomix.primitive.proxy.PrimitiveProxy
    public SessionId sessionId() {
        return this.sessionId;
    }

    @Override // io.atomix.primitive.proxy.PrimitiveProxy
    public String name() {
        return this.descriptor.name();
    }

    @Override // io.atomix.primitive.proxy.PrimitiveProxy
    public PrimitiveType serviceType() {
        return this.primitiveType;
    }

    @Override // io.atomix.primitive.proxy.PrimitiveProxy
    public PrimitiveProxy.State getState() {
        return this.state;
    }

    @Override // io.atomix.primitive.proxy.PrimitiveProxyExecutor
    public void addStateChangeListener(Consumer<PrimitiveProxy.State> consumer) {
        this.stateChangeListeners.add(consumer);
    }

    @Override // io.atomix.primitive.proxy.PrimitiveProxyExecutor
    public void removeStateChangeListener(Consumer<PrimitiveProxy.State> consumer) {
        this.stateChangeListeners.remove(consumer);
    }

    @Override // io.atomix.primitive.proxy.PrimitiveProxyExecutor
    public CompletableFuture<byte[]> execute(PrimitiveOperation primitiveOperation) {
        ComposableFuture composableFuture = new ComposableFuture();
        this.threadContext.execute(() -> {
            if (this.term.primary() == null) {
                this.primaryElection.getTerm().whenCompleteAsync((primaryTerm, th) -> {
                    if (th != null) {
                        composableFuture.completeExceptionally(new PrimitiveException.Unavailable());
                    } else if (primaryTerm.term() <= this.term.term() || primaryTerm.primary() == null) {
                        composableFuture.completeExceptionally(new PrimitiveException.Unavailable());
                    } else {
                        this.term = primaryTerm;
                        execute(primitiveOperation, (ComposableFuture<byte[]>) composableFuture);
                    }
                }, (Executor) this.threadContext);
            } else {
                execute(primitiveOperation, (ComposableFuture<byte[]>) composableFuture);
            }
        });
        return composableFuture;
    }

    private void execute(PrimitiveOperation primitiveOperation, ComposableFuture<byte[]> composableFuture) {
        ExecuteRequest request = ExecuteRequest.request(this.descriptor, this.sessionId.id().longValue(), this.clusterService.getLocalNode().id(), primitiveOperation);
        this.log.trace("Sending {} to {}", request, this.term.primary());
        PrimaryTerm primaryTerm = this.term;
        this.protocol.execute(primaryTerm.primary(), request).whenCompleteAsync((executeResponse, th) -> {
            if (th != null) {
                composableFuture.completeExceptionally(th);
                return;
            }
            this.log.trace("Received {}", executeResponse);
            if (executeResponse.status() == PrimaryBackupResponse.Status.OK) {
                composableFuture.complete(executeResponse.result());
            } else if (this.term.term() > primaryTerm.term()) {
                execute(primitiveOperation).whenComplete((BiConsumer<? super byte[], ? super Throwable>) composableFuture);
            } else {
                this.primaryElection.getTerm().whenComplete((primaryTerm2, th) -> {
                    if (th != null) {
                        composableFuture.completeExceptionally(new PrimitiveException.Unavailable());
                    } else if (primaryTerm2.term() <= primaryTerm.term() || primaryTerm2.primary() == null) {
                        composableFuture.completeExceptionally(new PrimitiveException.Unavailable());
                    } else {
                        execute(primitiveOperation).whenComplete((BiConsumer<? super byte[], ? super Throwable>) composableFuture);
                    }
                });
            }
        }, (Executor) this.threadContext);
    }

    @Override // io.atomix.primitive.proxy.PrimitiveProxyExecutor
    public void addEventListener(Consumer<PrimitiveEvent> consumer) {
        this.eventListeners.add(consumer);
    }

    @Override // io.atomix.primitive.proxy.PrimitiveProxyExecutor
    public void removeEventListener(Consumer<PrimitiveEvent> consumer) {
        this.eventListeners.remove(consumer);
    }

    private void changeReplicas(PrimaryTerm primaryTerm) {
        this.threadContext.execute(() -> {
            if (this.term == null || primaryTerm.term() > this.term.term()) {
                this.term = primaryTerm;
            }
        });
    }

    private void handleClusterEvent(ClusterEvent clusterEvent) {
        if (clusterEvent.type() == ClusterEvent.Type.NODE_DEACTIVATED && clusterEvent.subject().id().equals(this.term.primary())) {
            this.threadContext.execute(() -> {
                this.state = PrimitiveProxy.State.SUSPENDED;
                this.stateChangeListeners.forEach(consumer -> {
                    consumer.accept(this.state);
                });
            });
        }
    }

    private void handleEvent(PrimitiveEvent primitiveEvent) {
        this.log.trace("Received {}", primitiveEvent);
        this.eventListeners.forEach(consumer -> {
            consumer.accept(primitiveEvent);
        });
    }

    @Override // io.atomix.primitive.proxy.PrimitiveProxy
    public CompletableFuture<PrimitiveProxy> connect() {
        CompletableFuture<PrimitiveProxy> completableFuture = new CompletableFuture<>();
        this.threadContext.execute(() -> {
            this.primaryElection.getTerm().whenCompleteAsync((primaryTerm, th) -> {
                if (th != null) {
                    completableFuture.completeExceptionally(new PrimitiveException.Unavailable());
                } else {
                    if (primaryTerm.primary() == null) {
                        completableFuture.completeExceptionally(new PrimitiveException.Unavailable());
                        return;
                    }
                    this.term = primaryTerm;
                    this.protocol.registerEventListener(this.sessionId, this::handleEvent, this.threadContext);
                    completableFuture.complete(this);
                }
            }, (Executor) this.threadContext);
        });
        return completableFuture;
    }

    @Override // io.atomix.primitive.proxy.PrimitiveProxy
    public CompletableFuture<Void> close() {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        if (this.term.primary() != null) {
            this.protocol.close(this.term.primary(), new CloseRequest(this.descriptor, this.sessionId.id().longValue())).whenCompleteAsync((closeResponse, th) -> {
                this.protocol.unregisterEventListener(this.sessionId);
                this.clusterService.removeListener(this.clusterEventListener);
                this.primaryElection.removeListener(this.primaryElectionListener);
                completableFuture.complete(null);
            }, (Executor) this.threadContext);
        } else {
            completableFuture.complete(null);
        }
        return completableFuture;
    }
}
