package io.camunda.zeebe.gateway.impl.broker;

import io.atomix.cluster.ClusterMembershipEvent;
import io.atomix.cluster.ClusterMembershipService;
import io.atomix.cluster.messaging.ClusterEventService;
import io.atomix.cluster.messaging.MessagingService;
import io.atomix.cluster.messaging.Subscription;
import io.camunda.zeebe.gateway.Loggers;
import io.camunda.zeebe.gateway.impl.broker.cluster.BrokerTopologyManager;
import io.camunda.zeebe.gateway.impl.broker.cluster.BrokerTopologyManagerImpl;
import io.camunda.zeebe.gateway.impl.broker.request.BrokerRequest;
import io.camunda.zeebe.gateway.impl.broker.response.BrokerResponse;
import io.camunda.zeebe.gateway.impl.configuration.ClusterCfg;
import io.camunda.zeebe.gateway.impl.configuration.GatewayCfg;
import io.camunda.zeebe.transport.impl.AtomixClientTransportAdapter;
import io.camunda.zeebe.util.exception.UncheckedExecutionException;
import io.camunda.zeebe.util.sched.ActorScheduler;
import io.camunda.zeebe.util.sched.clock.ActorClock;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import org.slf4j.Logger;

/* loaded from: input_file:io/camunda/zeebe/gateway/impl/broker/BrokerClientImpl.class */
public final class BrokerClientImpl implements BrokerClient {
    public static final Logger LOG = Loggers.GATEWAY_LOGGER;
    private static final String ERROR_MSG_STOP_FAILED = "Failed to gracefully shutdown gateway broker client";
    private final ActorScheduler actorScheduler;
    private final BrokerTopologyManagerImpl topologyManager;
    private final boolean ownsActorScheduler;
    private final BrokerRequestManager requestManager;
    private boolean isClosed;
    private Subscription jobAvailableSubscription;
    private final ClusterEventService eventService;

    public BrokerClientImpl(GatewayCfg gatewayCfg, MessagingService messagingService, ClusterMembershipService clusterMembershipService, ClusterEventService clusterEventService) {
        this(gatewayCfg, messagingService, clusterMembershipService, clusterEventService, null);
    }

    public BrokerClientImpl(GatewayCfg gatewayCfg, MessagingService messagingService, ClusterMembershipService clusterMembershipService, ClusterEventService clusterEventService, ActorClock actorClock) {
        this(gatewayCfg, messagingService, clusterMembershipService, clusterEventService, ActorScheduler.newActorScheduler().setCpuBoundActorThreadCount(gatewayCfg.getThreads().getManagementThreads()).setIoBoundActorThreadCount(0).setActorClock(actorClock).setSchedulerName("gateway-scheduler").build(), true);
    }

    public BrokerClientImpl(GatewayCfg gatewayCfg, MessagingService messagingService, ClusterMembershipService clusterMembershipService, ClusterEventService clusterEventService, ActorScheduler actorScheduler, boolean z) {
        this.eventService = clusterEventService;
        this.actorScheduler = actorScheduler;
        this.ownsActorScheduler = z;
        if (z) {
            actorScheduler.start();
        }
        ClusterCfg cluster = gatewayCfg.getCluster();
        Objects.requireNonNull(clusterMembershipService);
        this.topologyManager = new BrokerTopologyManagerImpl(clusterMembershipService::getMembers);
        actorScheduler.submitActor(this.topologyManager);
        clusterMembershipService.addListener(this.topologyManager);
        clusterMembershipService.getMembers().forEach(member -> {
            this.topologyManager.event(new ClusterMembershipEvent(ClusterMembershipEvent.Type.MEMBER_ADDED, member));
        });
        AtomixClientTransportAdapter atomixClientTransportAdapter = new AtomixClientTransportAdapter(messagingService);
        actorScheduler.submitActor(atomixClientTransportAdapter);
        this.requestManager = new BrokerRequestManager(atomixClientTransportAdapter, this.topologyManager, new RoundRobinDispatchStrategy(this.topologyManager), cluster.getRequestTimeout());
        actorScheduler.submitActor(this.requestManager);
    }

    @Override // io.camunda.zeebe.gateway.impl.broker.BrokerClient, java.lang.AutoCloseable
    public void close() {
        if (this.isClosed) {
            return;
        }
        this.isClosed = true;
        LOG.debug("Closing gateway broker client ...");
        BrokerTopologyManagerImpl brokerTopologyManagerImpl = this.topologyManager;
        Objects.requireNonNull(brokerTopologyManagerImpl);
        doAndLogException(brokerTopologyManagerImpl::close);
        LOG.debug("topology manager closed");
        if (this.jobAvailableSubscription != null) {
            this.jobAvailableSubscription.close();
        }
        if (this.ownsActorScheduler) {
            try {
                this.actorScheduler.stop().get(15L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new UncheckedExecutionException(ERROR_MSG_STOP_FAILED, e);
            } catch (ExecutionException | TimeoutException e2) {
                throw new UncheckedExecutionException(ERROR_MSG_STOP_FAILED, e2);
            }
        }
        LOG.debug("Gateway broker client closed.");
    }

    @Override // io.camunda.zeebe.gateway.impl.broker.BrokerClient
    public <T> CompletableFuture<BrokerResponse<T>> sendRequest(BrokerRequest<T> brokerRequest) {
        return this.requestManager.sendRequest(brokerRequest);
    }

    @Override // io.camunda.zeebe.gateway.impl.broker.BrokerClient
    public <T> CompletableFuture<BrokerResponse<T>> sendRequest(BrokerRequest<T> brokerRequest, Duration duration) {
        return this.requestManager.sendRequest(brokerRequest, duration);
    }

    @Override // io.camunda.zeebe.gateway.impl.broker.BrokerClient
    public <T> CompletableFuture<BrokerResponse<T>> sendRequestWithRetry(BrokerRequest<T> brokerRequest) {
        return this.requestManager.sendRequestWithRetry(brokerRequest);
    }

    @Override // io.camunda.zeebe.gateway.impl.broker.BrokerClient
    public <T> CompletableFuture<BrokerResponse<T>> sendRequestWithRetry(BrokerRequest<T> brokerRequest, Duration duration) {
        return this.requestManager.sendRequestWithRetry(brokerRequest, duration);
    }

    @Override // io.camunda.zeebe.gateway.impl.broker.BrokerClient
    public <T> void sendRequestWithRetry(BrokerRequest<T> brokerRequest, BrokerResponseConsumer<T> brokerResponseConsumer, Consumer<Throwable> consumer) {
        this.requestManager.sendRequestWithRetry(brokerRequest).whenComplete((brokerResponse, th) -> {
            if (th == null) {
                brokerResponseConsumer.accept(brokerResponse.getKey(), brokerResponse.getResponse());
            } else {
                consumer.accept(th);
            }
        });
    }

    @Override // io.camunda.zeebe.gateway.impl.broker.BrokerClient
    public BrokerTopologyManager getTopologyManager() {
        return this.topologyManager;
    }

    @Override // io.camunda.zeebe.gateway.impl.broker.BrokerClient
    public void subscribeJobAvailableNotification(String str, Consumer<String> consumer) {
        this.jobAvailableSubscription = (Subscription) this.eventService.subscribe(str, obj -> {
            consumer.accept((String) obj);
            return CompletableFuture.completedFuture(null);
        }).join();
    }

    private void doAndLogException(Runnable runnable) {
        try {
            runnable.run();
        } catch (Exception e) {
            LOG.error("Exception when closing client. Ignoring", e);
        }
    }
}
