package io.atomix.protocols.log.partition.impl;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import io.atomix.cluster.MemberId;
import io.atomix.cluster.messaging.ClusterCommunicationService;
import io.atomix.cluster.messaging.MessagingException;
import io.atomix.primitive.PrimitiveException;
import io.atomix.protocols.log.protocol.AppendRequest;
import io.atomix.protocols.log.protocol.AppendResponse;
import io.atomix.protocols.log.protocol.ConsumeRequest;
import io.atomix.protocols.log.protocol.ConsumeResponse;
import io.atomix.protocols.log.protocol.LogClientProtocol;
import io.atomix.protocols.log.protocol.RecordsRequest;
import io.atomix.protocols.log.protocol.ResetRequest;
import io.atomix.utils.serializer.Serializer;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;

/* loaded from: input_file:io/atomix/protocols/log/partition/impl/LogClientCommunicator.class */
public class LogClientCommunicator implements LogClientProtocol {
    private final LogMessageContext context;
    private final Serializer serializer;
    private final ClusterCommunicationService clusterCommunicator;

    public LogClientCommunicator(String str, Serializer serializer, ClusterCommunicationService clusterCommunicationService) {
        this.context = new LogMessageContext(str);
        this.serializer = (Serializer) Preconditions.checkNotNull(serializer, "serializer cannot be null");
        this.clusterCommunicator = (ClusterCommunicationService) Preconditions.checkNotNull(clusterCommunicationService, "clusterCommunicator cannot be null");
    }

    private <T> void unicast(String str, T t, MemberId memberId) {
        ClusterCommunicationService clusterCommunicationService = this.clusterCommunicator;
        Serializer serializer = this.serializer;
        Objects.requireNonNull(serializer);
        clusterCommunicationService.unicast(str, t, serializer::encode, memberId, false);
    }

    private <T, U> CompletableFuture<U> send(String str, T t, MemberId memberId) {
        CompletableFuture<U> completableFuture = new CompletableFuture<>();
        ClusterCommunicationService clusterCommunicationService = this.clusterCommunicator;
        Serializer serializer = this.serializer;
        Objects.requireNonNull(serializer);
        Function function = serializer::encode;
        Serializer serializer2 = this.serializer;
        Objects.requireNonNull(serializer2);
        clusterCommunicationService.send(str, t, function, serializer2::decode, memberId).whenComplete((BiConsumer) (obj, th) -> {
            if (th == null) {
                completableFuture.complete(obj);
            } else if (Throwables.getRootCause(th) instanceof MessagingException.NoRemoteHandler) {
                completableFuture.completeExceptionally(new PrimitiveException.Unavailable());
            } else {
                completableFuture.completeExceptionally(th);
            }
        });
        return completableFuture;
    }

    @Override // io.atomix.protocols.log.protocol.LogClientProtocol
    public CompletableFuture<AppendResponse> append(MemberId memberId, AppendRequest appendRequest) {
        return send(this.context.appendSubject, appendRequest, memberId);
    }

    @Override // io.atomix.protocols.log.protocol.LogClientProtocol
    public CompletableFuture<ConsumeResponse> consume(MemberId memberId, ConsumeRequest consumeRequest) {
        return send(this.context.consumeSubject, consumeRequest, memberId);
    }

    @Override // io.atomix.protocols.log.protocol.LogClientProtocol
    public void reset(MemberId memberId, ResetRequest resetRequest) {
        unicast(this.context.resetSubject, resetRequest, memberId);
    }

    @Override // io.atomix.protocols.log.protocol.LogClientProtocol
    public void registerRecordsConsumer(String str, Consumer<RecordsRequest> consumer, Executor executor) {
        ClusterCommunicationService clusterCommunicationService = this.clusterCommunicator;
        Serializer serializer = this.serializer;
        Objects.requireNonNull(serializer);
        clusterCommunicationService.subscribe(str, serializer::decode, consumer, executor);
    }

    @Override // io.atomix.protocols.log.protocol.LogClientProtocol
    public void unregisterRecordsConsumer(String str) {
        this.clusterCommunicator.unsubscribe(str);
    }
}
