package io.zeebe.client.task.impl.subscription;

import io.zeebe.client.event.EventMetadata;
import io.zeebe.client.event.impl.GeneralEventImpl;
import io.zeebe.client.event.impl.TopicSubscriber;
import io.zeebe.client.event.impl.TopicSubscriberGroup;
import io.zeebe.client.event.impl.TopicSubscriptionSpec;
import io.zeebe.client.impl.Loggers;
import io.zeebe.client.impl.ZeebeClientImpl;
import io.zeebe.protocol.clientapi.SubscriptionType;
import io.zeebe.transport.ClientInputMessageSubscription;
import io.zeebe.transport.RemoteAddress;
import io.zeebe.transport.TransportListener;
import io.zeebe.util.sched.Actor;
import io.zeebe.util.sched.future.ActorFuture;
import io.zeebe.util.sched.future.CompletableActorFuture;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.agrona.ErrorHandler;
import org.agrona.concurrent.Agent;
import org.agrona.concurrent.AgentRunner;
import org.agrona.concurrent.BackoffIdleStrategy;
import org.agrona.concurrent.IdleStrategy;
import org.agrona.concurrent.status.AtomicCounter;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/client/task/impl/subscription/SubscriptionManager.class */
public class SubscriptionManager extends Actor implements SubscribedEventHandler, TransportListener {
    protected static final Logger LOGGER = Loggers.SUBSCRIPTION_LOGGER;
    protected final ZeebeClientImpl client;
    private final EventSubscribers taskSubscribers = new EventSubscribers();
    private final EventSubscribers topicSubscribers = new EventSubscribers();
    final IdleStrategy idleStrategy = new BackoffIdleStrategy(1000, 100, 1, TimeUnit.MILLISECONDS.toNanos(1));
    final ErrorHandler errorHandler = (v0) -> {
        v0.printStackTrace();
    };
    private final List<AgentRunner> agentRunners = new ArrayList();
    private boolean isClosing = false;
    private ClientInputMessageSubscription incomingEventSubscription;

    public SubscriptionManager(ZeebeClientImpl zeebeClientImpl) {
        this.client = zeebeClientImpl;
    }

    protected void onActorStarting() {
        this.actor.runOnCompletion(this.client.getTransport().openSubscription("event-acquisition", new SubscribedEventCollector(this, this.client.getMsgPackConverter())), (clientInputMessageSubscription, th) -> {
            this.incomingEventSubscription = clientInputMessageSubscription;
        });
    }

    protected void onActorStarted() {
        this.actor.consume(this.incomingEventSubscription, () -> {
            if (this.incomingEventSubscription.poll() == 0) {
                this.actor.yield();
            }
        });
        startSubscriptionExecution(this.client.getNumExecutionThreads());
    }

    private void startSubscriptionExecution(int i) {
        for (int i2 = 0; i2 < i; i2++) {
            AgentRunner initAgentRunner = initAgentRunner(new SubscriptionExecutor(this.topicSubscribers, this.taskSubscribers));
            AgentRunner.startOnThread(initAgentRunner);
            this.agentRunners.add(initAgentRunner);
        }
    }

    private void stopSubscriptionExecution() {
        Iterator<AgentRunner> it = this.agentRunners.iterator();
        while (it.hasNext()) {
            it.next().close();
        }
    }

    private AgentRunner initAgentRunner(Agent agent) {
        return new AgentRunner(this.idleStrategy, this.errorHandler, (AtomicCounter) null, agent);
    }

    protected void onActorClosing() {
        closeAllSubscribers("Subscription manager shutdown");
        stopSubscriptionExecution();
    }

    protected void onActorCloseRequested() {
        this.isClosing = true;
    }

    public boolean isClosing() {
        return this.isClosing;
    }

    public ActorFuture<TopicSubscriberGroup> openTopicSubscription(TopicSubscriptionSpec topicSubscriptionSpec) {
        CompletableActorFuture completableActorFuture = new CompletableActorFuture();
        this.actor.call(() -> {
            TopicSubscriberGroup topicSubscriberGroup = new TopicSubscriberGroup(this.actor, this.client, this, topicSubscriptionSpec);
            this.topicSubscribers.addGroup(topicSubscriberGroup);
            topicSubscriberGroup.open(completableActorFuture);
        });
        return completableActorFuture;
    }

    public ActorFuture<TaskSubscriberGroup> openTaskSubscription(TaskSubscriptionSpec taskSubscriptionSpec) {
        CompletableActorFuture completableActorFuture = new CompletableActorFuture();
        this.actor.call(() -> {
            TaskSubscriberGroup taskSubscriberGroup = new TaskSubscriberGroup(this.actor, this.client, this, taskSubscriptionSpec);
            this.taskSubscribers.addGroup(taskSubscriberGroup);
            taskSubscriberGroup.open(completableActorFuture);
        });
        return completableActorFuture;
    }

    public void addSubscriber(Subscriber subscriber) {
        if (subscriber instanceof TopicSubscriber) {
            this.topicSubscribers.add(subscriber);
        } else {
            this.taskSubscribers.add(subscriber);
        }
    }

    public void removeSubscriber(Subscriber subscriber) {
        if (subscriber instanceof TopicSubscriber) {
            this.topicSubscribers.remove(subscriber);
        } else {
            this.taskSubscribers.remove(subscriber);
        }
    }

    public void closeAllSubscribers(String str) {
        this.topicSubscribers.closeAllGroups(str);
        this.taskSubscribers.closeAllGroups(str);
    }

    public ActorFuture<Void> reopenSubscriptionsForRemoteAsync(RemoteAddress remoteAddress) {
        return this.actor.call(() -> {
            this.topicSubscribers.reopenSubscribersForRemote(remoteAddress);
            this.taskSubscribers.reopenSubscribersForRemote(remoteAddress);
        });
    }

    public ActorFuture<Void> closeGroup(SubscriberGroup<?> subscriberGroup, String str) {
        CompletableActorFuture completableActorFuture = new CompletableActorFuture();
        this.actor.call(() -> {
            subscriberGroup.listenForClose(completableActorFuture);
            subscriberGroup.initClose(str, null);
        });
        return completableActorFuture;
    }

    @Override // io.zeebe.client.task.impl.subscription.SubscribedEventHandler
    public boolean onEvent(SubscriptionType subscriptionType, long j, GeneralEventImpl generalEventImpl) {
        EventMetadata metadata = generalEventImpl.getMetadata();
        EventSubscribers eventSubscribers = subscriptionType == SubscriptionType.TASK_SUBSCRIPTION ? this.taskSubscribers : subscriptionType == SubscriptionType.TOPIC_SUBSCRIPTION ? this.topicSubscribers : null;
        Subscriber subscriber = null;
        if (eventSubscribers != null) {
            int partitionId = metadata.getPartitionId();
            subscriber = eventSubscribers.getSubscriber(partitionId, j);
            if (subscriber == null && eventSubscribers.isAnySubscriberOpeningOn(partitionId)) {
                return false;
            }
        }
        if (subscriber == null || !subscriber.isOpen()) {
            LOGGER.debug("Ignoring event event {} for subscription [type={}, partition={}, key={}]", new Object[]{generalEventImpl, subscriptionType, Integer.valueOf(generalEventImpl.getMetadata().getPartitionId()), Long.valueOf(j)});
            return true;
        }
        generalEventImpl.setTopicName(subscriber.getTopicName());
        return subscriber.addEvent(generalEventImpl);
    }

    public ActorFuture<Void> close() {
        return this.actor.close();
    }

    public void onConnectionEstablished(RemoteAddress remoteAddress) {
    }

    public void onConnectionClosed(RemoteAddress remoteAddress) {
        reopenSubscriptionsForRemoteAsync(remoteAddress);
    }
}
