package io.zeebe.broker.clustering.raft;

import io.zeebe.broker.Loggers;
import io.zeebe.broker.clustering.ClusterServiceNames;
import io.zeebe.broker.clustering.management.OnOpenLogStreamListener;
import io.zeebe.broker.logstreams.LogStreamService;
import io.zeebe.broker.logstreams.LogStreamServiceNames;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.protocol.Protocol;
import io.zeebe.raft.Raft;
import io.zeebe.raft.RaftConfiguration;
import io.zeebe.raft.RaftPersistentStorage;
import io.zeebe.raft.RaftStateListener;
import io.zeebe.raft.state.RaftState;
import io.zeebe.servicecontainer.Injector;
import io.zeebe.servicecontainer.Service;
import io.zeebe.servicecontainer.ServiceName;
import io.zeebe.servicecontainer.ServiceStartContext;
import io.zeebe.servicecontainer.ServiceStopContext;
import io.zeebe.transport.ClientTransport;
import io.zeebe.transport.SocketAddress;
import io.zeebe.util.buffer.BufferUtil;
import io.zeebe.util.sched.Actor;
import io.zeebe.util.sched.ActorScheduler;
import io.zeebe.util.sched.channel.OneToOneRingBufferChannel;
import io.zeebe.util.sched.future.CompletableActorFuture;
import java.util.List;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.ringbuffer.RingBufferDescriptor;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/broker/clustering/raft/RaftService.class */
public class RaftService extends Actor implements Service<Raft>, RaftStateListener {
    private static final Logger LOG = Loggers.SERVICES_LOGGER;
    private final RaftConfiguration configuration;
    private final SocketAddress socketAddress;
    private final LogStream logStream;
    private final List<SocketAddress> members;
    private final RaftPersistentStorage persistentStorage;
    private final RaftStateListener raftStateListener;
    private final ServiceName<LogStream> logStreamServiceName;
    private final OnOpenLogStreamListener onOpenLogStreamListener;
    private final ServiceName<Raft> raftServiceName;
    private Injector<ClientTransport> clientTransportInjector = new Injector<>();
    private ActorScheduler actorScheduler;
    private Raft raft;
    private CompletableActorFuture<Void> raftServiceCloseFuture;
    private CompletableActorFuture<Void> raftServiceOpenFuture;
    private ServiceStartContext startContext;
    private RaftState currentRaftState;

    public RaftService(RaftConfiguration raftConfiguration, SocketAddress socketAddress, LogStream logStream, List<SocketAddress> list, RaftPersistentStorage raftPersistentStorage, RaftStateListener raftStateListener, OnOpenLogStreamListener onOpenLogStreamListener, ServiceName<Raft> serviceName) {
        this.configuration = raftConfiguration;
        this.socketAddress = socketAddress;
        this.logStream = logStream;
        this.members = list;
        this.persistentStorage = raftPersistentStorage;
        this.raftStateListener = raftStateListener;
        this.logStreamServiceName = LogStreamServiceNames.logStreamServiceName(logStream.getLogName());
        this.onOpenLogStreamListener = onOpenLogStreamListener;
        this.raftServiceName = serviceName;
    }

    protected void onActorStarted() {
        this.actor.runOnCompletion(this.logStream.openAsync(), (r16, th) -> {
            if (th != null) {
                this.raftServiceOpenFuture.completeExceptionally(th);
                Loggers.CLUSTERING_LOGGER.debug("Failed to open log stream.");
                return;
            }
            this.raft = new Raft(this.actorScheduler, this.configuration, this.socketAddress, this.logStream, (ClientTransport) this.clientTransportInjector.getValue(), this.persistentStorage, new OneToOneRingBufferChannel(new UnsafeBuffer(new byte[2097152 + RingBufferDescriptor.TRAILER_LENGTH])), new RaftStateListener[]{this.raftStateListener, this});
            this.raft.addMembers(this.members);
            this.actorScheduler.submitActor(this.raft);
            this.raftServiceOpenFuture.complete((Object) null);
        });
    }

    public void start(ServiceStartContext serviceStartContext) {
        this.actorScheduler = serviceStartContext.getScheduler();
        this.raftServiceOpenFuture = new CompletableActorFuture<>();
        this.actorScheduler.submitActor(this);
        this.startContext = serviceStartContext;
        this.startContext.async(this.raftServiceOpenFuture);
    }

    protected void onActorClosing() {
    }

    public void stop(ServiceStopContext serviceStopContext) {
        this.raftServiceCloseFuture = new CompletableActorFuture<>();
        this.actor.call(() -> {
            this.actor.runOnCompletion(this.raft.close(), (r6, th) -> {
                this.actor.runOnCompletion(this.logStream.closeAsync(), (r5, th) -> {
                    if (th != null) {
                        this.raftServiceCloseFuture.completeExceptionally(th);
                    } else if (th != null) {
                        this.raftServiceCloseFuture.completeExceptionally(th);
                    } else {
                        this.raftServiceCloseFuture.complete((Object) null);
                    }
                    this.actor.close();
                });
            });
        });
        serviceStopContext.async(this.raftServiceCloseFuture);
    }

    /* renamed from: get, reason: merged with bridge method [inline-methods] */
    public Raft m22get() {
        return this.raft;
    }

    public Injector<ClientTransport> getClientTransportInjector() {
        return this.clientTransportInjector;
    }

    public void onStateChange(int i, DirectBuffer directBuffer, SocketAddress socketAddress, RaftState raftState) {
        this.actor.call(() -> {
            this.currentRaftState = raftState;
            if (this.currentRaftState == RaftState.LEADER) {
                Loggers.CLUSTERING_LOGGER.debug("Start log stream...topic {}", BufferUtil.bufferAsString(this.raft.getLogStream().getTopicName()));
                LogStream logStream = this.raft.getLogStream();
                this.actor.runOnCompletion(this.startContext.createService(this.logStreamServiceName, new LogStreamService(logStream)).dependency(ClusterServiceNames.CLUSTER_MANAGER_SERVICE).dependency(this.raftServiceName).group(Protocol.SYSTEM_TOPIC_BUF.equals(logStream.getTopicName()) ? LogStreamServiceNames.SYSTEM_STREAM_GROUP : LogStreamServiceNames.WORKFLOW_STREAM_GROUP).install(), (r5, th) -> {
                    if (th == null) {
                        this.actor.submit(() -> {
                            this.onOpenLogStreamListener.onOpenLogStreamService(this.raft.getLogStream());
                        });
                    } else {
                        LOG.error("Failed to install log stream service '{}'", this.logStreamServiceName);
                    }
                });
                return;
            }
            if (this.currentRaftState == RaftState.FOLLOWER && this.startContext.hasService(this.logStreamServiceName)) {
                this.startContext.removeService(this.logStreamServiceName);
            }
        });
    }
}
