/*
 * Decompiled with CFR 0.152.
 */
package io.zeebe.broker.clustering.management;

import io.zeebe.broker.Loggers;
import io.zeebe.broker.clustering.ClusterServiceNames;
import io.zeebe.broker.clustering.gossip.data.Peer;
import io.zeebe.broker.clustering.management.ClusterManagerContext;
import io.zeebe.broker.clustering.management.StartLogStreamServiceController;
import io.zeebe.broker.clustering.management.config.ClusterManagementConfig;
import io.zeebe.broker.clustering.management.handler.ClusterManagerFragmentHandler;
import io.zeebe.broker.clustering.management.message.InvitationRequest;
import io.zeebe.broker.clustering.management.message.InvitationResponse;
import io.zeebe.broker.clustering.raft.RaftPersistentFileStorage;
import io.zeebe.broker.clustering.raft.RaftService;
import io.zeebe.broker.logstreams.LogStreamsManager;
import io.zeebe.broker.system.SystemServiceNames;
import io.zeebe.broker.transport.TransportServiceNames;
import io.zeebe.logstreams.impl.log.fs.FsLogStorage;
import io.zeebe.logstreams.log.LogStream;
import io.zeebe.raft.Raft;
import io.zeebe.raft.RaftPersistentStorage;
import io.zeebe.servicecontainer.Service;
import io.zeebe.servicecontainer.ServiceContainer;
import io.zeebe.servicecontainer.ServiceName;
import io.zeebe.transport.RemoteAddress;
import io.zeebe.transport.RequestResponseController;
import io.zeebe.transport.ServerInputSubscription;
import io.zeebe.transport.ServerMessageHandler;
import io.zeebe.transport.ServerOutput;
import io.zeebe.transport.ServerRequestHandler;
import io.zeebe.transport.ServerResponse;
import io.zeebe.transport.SocketAddress;
import io.zeebe.util.actor.Actor;
import io.zeebe.util.buffer.BufferWriter;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import org.agrona.DirectBuffer;
import org.agrona.concurrent.ManyToOneConcurrentArrayQueue;
import org.slf4j.Logger;

public class ClusterManager
implements Actor {
    public static final Logger LOG = Loggers.CLUSTERING_LOGGER;
    private final ClusterManagerContext context;
    private final ServiceContainer serviceContainer;
    private final List<Raft> rafts;
    private final List<StartLogStreamServiceController> startLogStreamServiceControllers;
    private final ManyToOneConcurrentArrayQueue<Runnable> managementCmdQueue;
    private final Consumer<Runnable> commandConsumer;
    private final List<RequestResponseController> activeRequestControllers;
    private final InvitationRequest invitationRequest;
    private final InvitationResponse invitationResponse;
    private ClusterManagementConfig config;
    protected final ServerResponse response = new ServerResponse();
    protected final ServerInputSubscription inputSubscription;

    public ClusterManager(ClusterManagerContext context, ServiceContainer serviceContainer, ClusterManagementConfig config) {
        this.context = context;
        this.serviceContainer = serviceContainer;
        this.config = config;
        this.rafts = new CopyOnWriteArrayList<Raft>();
        this.startLogStreamServiceControllers = new CopyOnWriteArrayList<StartLogStreamServiceController>();
        this.managementCmdQueue = new ManyToOneConcurrentArrayQueue(100);
        this.commandConsumer = Runnable::run;
        this.activeRequestControllers = new CopyOnWriteArrayList<RequestResponseController>();
        this.invitationRequest = new InvitationRequest();
        this.invitationResponse = new InvitationResponse();
        ClusterManagerFragmentHandler fragmentHandler = new ClusterManagerFragmentHandler(this);
        this.inputSubscription = (ServerInputSubscription)context.getServerTransport().openSubscription("cluster-management", (ServerMessageHandler)fragmentHandler, (ServerRequestHandler)fragmentHandler).join();
        context.getPeers().registerListener(this::addPeer);
    }

    public void open() {
        LogStreamsManager logStreamManager = this.context.getLogStreamsManager();
        File storageDirectory = new File(this.config.directory);
        if (!storageDirectory.exists()) {
            try {
                storageDirectory.getParentFile().mkdirs();
                Files.createDirectory(storageDirectory.toPath(), new FileAttribute[0]);
            }
            catch (IOException e) {
                LOG.error("Unable to create directory {}", (Object)storageDirectory, (Object)e);
            }
        }
        SocketAddress socketAddress = this.context.getLocalPeer().replicationEndpoint();
        File[] storageFiles = storageDirectory.listFiles();
        if (storageFiles != null && storageFiles.length > 0) {
            for (int i = 0; i < storageFiles.length; ++i) {
                int partitionId;
                File storageFile = storageFiles[i];
                RaftPersistentFileStorage storage = new RaftPersistentFileStorage(storageFile.getAbsolutePath());
                DirectBuffer topicName = storage.getTopicName();
                LogStream logStream2 = logStreamManager.getLogStream(topicName, partitionId = storage.getPartitionId());
                if (logStream2 == null) {
                    String directory = storage.getLogDirectory();
                    logStream2 = logStreamManager.createLogStream(topicName, partitionId, directory);
                }
                storage.setLogStream(logStream2);
                this.createRaft(socketAddress, logStream2, storage.getMembers(), storage);
            }
        } else if (this.context.getPeers().sizeVolatile() == 1) {
            logStreamManager.forEachLogStream(logStream -> this.createRaft(socketAddress, (LogStream)logStream, Collections.emptyList()));
        }
    }

    public String name() {
        return "management";
    }

    public int doWork() throws Exception {
        int workcount = 0;
        workcount += this.managementCmdQueue.drain(this.commandConsumer);
        workcount += this.inputSubscription.poll();
        int i = 0;
        while (i < this.activeRequestControllers.size()) {
            RequestResponseController requestController = this.activeRequestControllers.get(i);
            workcount += requestController.doWork();
            if (requestController.isFailed() || requestController.isResponseAvailable()) {
                requestController.close();
            }
            if (requestController.isClosed()) {
                this.activeRequestControllers.remove(i);
                continue;
            }
            ++i;
        }
        for (int j = 0; j < this.startLogStreamServiceControllers.size(); ++j) {
            workcount += this.startLogStreamServiceControllers.get(j).doWork();
        }
        return workcount;
    }

    public void addPeer(Peer peer) {
        Peer copy = new Peer();
        copy.wrap(peer);
        this.managementCmdQueue.add(() -> {
            for (int i = 0; i < this.rafts.size(); ++i) {
                Raft raft = this.rafts.get(i);
                ArrayList<SocketAddress> members = new ArrayList<SocketAddress>();
                members.add(raft.getSocketAddress());
                raft.getMembers().forEach(raftMember -> members.add(raftMember.getRemoteAddress().getAddress()));
                LogStream logStream = raft.getLogStream();
                InvitationRequest invitationRequest = new InvitationRequest().topicName(logStream.getTopicName()).partitionId(logStream.getPartitionId()).term(raft.getTerm()).members(members);
                RequestResponseController requestController = new RequestResponseController(this.context.getClientTransport());
                requestController.open(copy.managementEndpoint(), (BufferWriter)invitationRequest, null);
                this.activeRequestControllers.add(requestController);
            }
        });
    }

    public void addRaft(Raft raft) {
        this.managementCmdQueue.add(() -> {
            this.context.getLocalPeer().addRaft(raft);
            this.rafts.add(raft);
            this.startLogStreamServiceControllers.add(new StartLogStreamServiceController(raft, this.serviceContainer));
        });
    }

    public void removeRaft(Raft raft) {
        LogStream logStream = raft.getLogStream();
        DirectBuffer topicName = logStream.getTopicName();
        int partitionId = logStream.getPartitionId();
        this.managementCmdQueue.add(() -> {
            LogStream stream;
            Raft r;
            int i;
            for (i = 0; i < this.rafts.size(); ++i) {
                r = this.rafts.get(i);
                stream = r.getLogStream();
                if (!topicName.equals(stream.getTopicName()) || partitionId != stream.getPartitionId()) continue;
                this.context.getLocalPeer().removeRaft(raft);
                this.rafts.remove(i);
                break;
            }
            for (i = 0; i < this.startLogStreamServiceControllers.size(); ++i) {
                r = this.startLogStreamServiceControllers.get(i).getRaft();
                stream = r.getLogStream();
                if (!topicName.equals(stream.getTopicName()) || partitionId != stream.getPartitionId()) continue;
                this.startLogStreamServiceControllers.remove(i);
                break;
            }
        });
    }

    public void createRaft(SocketAddress socketAddress, LogStream logStream, List<SocketAddress> members) {
        FsLogStorage logStorage = (FsLogStorage)logStream.getLogStorage();
        String path = logStorage.getConfig().getPath();
        RaftPersistentFileStorage storage = new RaftPersistentFileStorage(String.format("%s%s.meta", this.config.directory, logStream.getLogName()));
        storage.setLogStream(logStream).setLogDirectory(path).save();
        this.createRaft(socketAddress, logStream, members, storage);
    }

    public void createRaft(SocketAddress socketAddress, LogStream logStream, List<SocketAddress> members, RaftPersistentStorage persistentStorage) {
        RaftService raftService = new RaftService(socketAddress, logStream, members, persistentStorage);
        ServiceName<Raft> raftServiceName = ClusterServiceNames.raftServiceName(logStream.getLogName());
        this.serviceContainer.createService(raftServiceName, (Service)raftService).group(ClusterServiceNames.RAFT_SERVICE_GROUP).dependency(SystemServiceNames.ACTOR_SCHEDULER_SERVICE, raftService.getActorSchedulerInjector()).dependency(TransportServiceNames.bufferingServerTransport("replicationApi.server"), raftService.getServerTransportInjector()).dependency(TransportServiceNames.clientTransport("replicationApi.client"), raftService.getClientTransportInjector()).install();
    }

    public boolean onInvitationRequest(DirectBuffer buffer, int offset, int length, ServerOutput output, RemoteAddress requestAddress, long requestId) {
        this.invitationRequest.reset();
        this.invitationRequest.wrap(buffer, offset, length);
        DirectBuffer topicName = this.invitationRequest.topicName();
        int partitionId = this.invitationRequest.partitionId();
        LogStreamsManager logStreamManager = this.context.getLogStreamsManager();
        LogStream logStream = logStreamManager.createLogStream(topicName, partitionId);
        SocketAddress socketAddress = this.context.getLocalPeer().replicationEndpoint();
        this.createRaft(socketAddress, logStream, new ArrayList<SocketAddress>(this.invitationRequest.members()));
        this.invitationResponse.reset();
        this.response.reset().remoteAddress(requestAddress).requestId(requestId).writer((BufferWriter)this.invitationResponse);
        return output.sendResponse(this.response);
    }
}

