package io.zeebe.client.clustering.impl;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.zeebe.client.clustering.Topology;
import io.zeebe.client.cmd.BrokerErrorException;
import io.zeebe.client.impl.ControlMessageRequestHandler;
import io.zeebe.protocol.clientapi.ErrorResponseDecoder;
import io.zeebe.protocol.clientapi.MessageHeaderDecoder;
import io.zeebe.transport.ClientOutput;
import io.zeebe.transport.ClientResponse;
import io.zeebe.transport.ClientTransport;
import io.zeebe.transport.RemoteAddress;
import io.zeebe.util.sched.Actor;
import io.zeebe.util.sched.clock.ActorClock;
import io.zeebe.util.sched.future.ActorFuture;
import io.zeebe.util.sched.future.CompletableActorFuture;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.agrona.DirectBuffer;

/* loaded from: input_file:io/zeebe/client/clustering/impl/ClientTopologyManager.class */
public class ClientTopologyManager extends Actor {
    public static final Duration MAX_REFRESH_INTERVAL_MILLIS = Duration.ofSeconds(10);
    public static final Duration MIN_REFRESH_INTERVAL_MILLIS = Duration.ofMillis(300);
    protected final ClientOutput output;
    protected final ClientTransport transport;
    protected final AtomicReference<TopologyImpl> topology;
    protected final ControlMessageRequestHandler requestWriter;
    protected final List<CompletableActorFuture<Topology>> nextTopologyFutures = new ArrayList();
    protected final MessageHeaderDecoder messageHeaderDecoder = new MessageHeaderDecoder();
    protected final ErrorResponseDecoder errorResponseDecoder = new ErrorResponseDecoder();
    protected int refreshAttempt = 0;
    protected long lastRefreshTime = -1;

    public ClientTopologyManager(ClientTransport clientTransport, ObjectMapper objectMapper, RemoteAddress remoteAddress) {
        this.transport = clientTransport;
        this.output = clientTransport.getOutput();
        this.topology = new AtomicReference<>(new TopologyImpl(remoteAddress));
        this.requestWriter = new ControlMessageRequestHandler(objectMapper, new RequestTopologyCmdImpl(null, null));
    }

    protected void onActorStarted() {
        this.actor.run(this::refreshTopology);
    }

    public ActorFuture<Void> close() {
        return this.actor.close();
    }

    public TopologyImpl getTopology() {
        return this.topology.get();
    }

    public ActorFuture<Topology> requestTopology() {
        CompletableActorFuture completableActorFuture = new CompletableActorFuture();
        this.actor.call(() -> {
            boolean isEmpty = this.nextTopologyFutures.isEmpty();
            this.nextTopologyFutures.add(completableActorFuture);
            if (isEmpty) {
                scheduleNextRefresh();
            }
        });
        return completableActorFuture;
    }

    private void scheduleNextRefresh() {
        long currentTimeMillis = ActorClock.currentTimeMillis() - this.lastRefreshTime;
        if (currentTimeMillis >= MIN_REFRESH_INTERVAL_MILLIS.toMillis()) {
            refreshTopology();
        } else {
            this.actor.runDelayed(Duration.ofMillis(MIN_REFRESH_INTERVAL_MILLIS.toMillis() - currentTimeMillis), () -> {
                refreshTopology();
            });
        }
    }

    public void provideTopology(TopologyResponse topologyResponse) {
        this.actor.call(() -> {
            onNewTopology(topologyResponse);
        });
    }

    private void refreshTopology() {
        ActorFuture sendRequest = this.output.sendRequest(this.topology.get().getRandomBroker(), this.requestWriter, Duration.ofSeconds(1L));
        this.refreshAttempt++;
        this.lastRefreshTime = ActorClock.currentTimeMillis();
        this.actor.runOnCompletion(sendRequest, this::handleResponse);
        this.actor.runDelayed(MAX_REFRESH_INTERVAL_MILLIS, scheduleIdleRefresh());
    }

    private Runnable scheduleIdleRefresh() {
        int i = this.refreshAttempt;
        return () -> {
            if (i == this.refreshAttempt) {
                this.actor.run(this::refreshTopology);
            }
        };
    }

    private void handleResponse(ClientResponse clientResponse, Throwable th) {
        if (th != null) {
            failRefreshFutures(th);
            return;
        }
        try {
            onNewTopology(decodeTopology(clientResponse.getResponseBuffer()));
            clientResponse.close();
        } catch (Throwable th2) {
            clientResponse.close();
            throw th2;
        }
    }

    private void onNewTopology(TopologyResponse topologyResponse) {
        AtomicReference<TopologyImpl> atomicReference = this.topology;
        ClientTransport clientTransport = this.transport;
        clientTransport.getClass();
        atomicReference.set(new TopologyImpl(topologyResponse, clientTransport::registerRemoteAddress));
        completeRefreshFutures();
    }

    private void completeRefreshFutures() {
        this.nextTopologyFutures.forEach(completableActorFuture -> {
            completableActorFuture.complete(this.topology.get());
        });
        this.nextTopologyFutures.clear();
    }

    private void failRefreshFutures(Throwable th) {
        this.nextTopologyFutures.forEach(completableActorFuture -> {
            completableActorFuture.completeExceptionally("Could not refresh topology", th);
        });
        this.nextTopologyFutures.clear();
    }

    private TopologyResponse decodeTopology(DirectBuffer directBuffer) {
        this.messageHeaderDecoder.wrap(directBuffer, 0);
        int blockLength = this.messageHeaderDecoder.blockLength();
        int version = this.messageHeaderDecoder.version();
        int encodedLength = this.messageHeaderDecoder.encodedLength();
        if (this.requestWriter.handlesResponse(this.messageHeaderDecoder)) {
            try {
                return (TopologyResponse) this.requestWriter.getResult(directBuffer, encodedLength, blockLength, version);
            } catch (Exception e) {
                throw new RuntimeException("Unable to parse topic list from broker response", e);
            }
        }
        if (this.messageHeaderDecoder.schemaId() != 0 || this.messageHeaderDecoder.templateId() != 0) {
            throw new RuntimeException(String.format("Unexpected response format. Schema %s and template %s.", Integer.valueOf(this.messageHeaderDecoder.schemaId()), Integer.valueOf(this.messageHeaderDecoder.templateId())));
        }
        this.errorResponseDecoder.wrap(directBuffer, 0, blockLength, version);
        throw new BrokerErrorException(this.errorResponseDecoder.errorCode(), this.errorResponseDecoder.errorData());
    }
}
