package io.zeebe.client.task.impl.subscription;

import io.zeebe.client.cmd.ClientException;
import io.zeebe.client.event.impl.GeneralEventImpl;
import io.zeebe.client.impl.Loggers;
import io.zeebe.client.impl.ZeebeClientImpl;
import io.zeebe.client.task.impl.subscription.Subscriber;
import io.zeebe.client.topic.Topic;
import io.zeebe.transport.RemoteAddress;
import io.zeebe.util.CheckedConsumer;
import io.zeebe.util.sched.ActorCondition;
import io.zeebe.util.sched.ActorControl;
import io.zeebe.util.sched.future.ActorFuture;
import io.zeebe.util.sched.future.CompletableActorFuture;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Predicate;
import org.agrona.collections.Int2ObjectHashMap;

/* loaded from: input_file:io/zeebe/client/task/impl/subscription/SubscriberGroup.class */
public abstract class SubscriberGroup<T extends Subscriber> {
    protected final ActorControl actor;
    protected final ZeebeClientImpl client;
    protected final String topic;
    protected final SubscriptionManager subscriptionManager;
    protected CompletableActorFuture<SubscriberGroup<T>> openFuture;
    private String closeReason;
    private Throwable closeCause;
    private static final int STATE_OPENING = 0;
    private static final int STATE_OPEN = 1;
    private static final int STATE_CLOSING = 2;
    private static final int STATE_CLOSED = 3;
    protected final Int2ObjectHashMap<SubscriberState> subscriberState = new Int2ObjectHashMap<>();
    protected final List<T> subscribersList = new CopyOnWriteArrayList();
    protected List<CompletableActorFuture<Void>> closeFutures = new ArrayList();
    private volatile int state = STATE_OPENING;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/zeebe/client/task/impl/subscription/SubscriberGroup$SubscriberState.class */
    public enum SubscriberState {
        NOT_SUBSCRIBED,
        UNSUBSCRIBING,
        SUBSCRIBING,
        SUBSCRIBED
    }

    public SubscriberGroup(ActorControl actorControl, ZeebeClientImpl zeebeClientImpl, SubscriptionManager subscriptionManager, String str) {
        this.actor = actorControl;
        this.subscriptionManager = subscriptionManager;
        this.client = zeebeClientImpl;
        this.topic = str;
    }

    protected void doAbort(String str, Throwable th) {
        setCloseReason(str, th);
        this.state = 3;
        onGroupClosed();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void open(CompletableActorFuture<SubscriberGroup<T>> completableActorFuture) {
        this.openFuture = completableActorFuture;
        this.actor.runOnCompletion(this.client.topics().getTopics().mo10executeAsync(), (topics, th) -> {
            if (th != null) {
                doAbort("Requesting partitions failed", th);
                return;
            }
            Optional<Topic> findFirst = topics.getTopics().stream().filter(topic -> {
                return this.topic.equals(topic.getName());
            }).findFirst();
            if (findFirst.isPresent()) {
                findFirst.get().getPartitions().forEach(partition -> {
                    openSubscriber(partition.getId());
                });
            } else {
                doAbort("Topic " + this.topic + " is not known", null);
            }
        });
    }

    public void listenForClose(CompletableActorFuture<Void> completableActorFuture) {
        if (this.state == 3) {
            completableActorFuture.complete((Object) null);
        } else {
            this.closeFutures.add(completableActorFuture);
        }
    }

    public void initClose(String str, Throwable th) {
        if (this.state == STATE_OPEN) {
            this.state = STATE_CLOSING;
            setCloseReason(str, null);
            if (checkGroupClosed()) {
                return;
            }
            this.subscribersList.forEach(subscriber -> {
                closeSubscriber(subscriber);
            });
        }
    }

    private void onGroupClosed() {
        if (this.openFuture != null) {
            this.openFuture.completeExceptionally(new RuntimeException("Could not open subscriber group " + describeGroup() + ". " + this.closeReason, this.closeCause));
            this.openFuture = null;
        }
        resolveCloseFutures();
    }

    private void resolveCloseFutures() {
        this.closeFutures.forEach(completableActorFuture -> {
            completableActorFuture.complete((Object) null);
        });
        this.closeFutures.clear();
    }

    private void onGroupOpen() {
        if (!allPartitionsSubscribed()) {
            initClose("Could not subscribe to all partitions", null);
            return;
        }
        if (this.openFuture != null) {
            this.openFuture.complete(this);
            this.openFuture = null;
        }
        if (this.closeFutures.isEmpty()) {
            return;
        }
        initClose("Close requested", null);
    }

    public ActorFuture<Void> closeAsync() {
        return this.subscriptionManager.closeGroup(this, "Close requested");
    }

    public void reopenSubscriptionsForRemoteAsync(RemoteAddress remoteAddress) {
        for (T t : this.subscribersList) {
            if (t.getEventSource().equals(remoteAddress)) {
                t.disable();
                this.subscribersList.remove(t);
                onSubscriberClosed(t);
                if (this.state == STATE_OPEN) {
                    openSubscriber(t.getPartitionId());
                }
            }
        }
    }

    public void close() {
        try {
            closeAsync().get();
        } catch (Exception e) {
            throw new ClientException("Exception while closing subscription", e);
        }
    }

    private void openSubscriber(int i) {
        this.subscriberState.put(i, SubscriberState.SUBSCRIBING);
        this.actor.runOnCompletionBlockingCurrentPhase(requestNewSubscriber(i), (eventSubscriptionCreationResult, th) -> {
            if (th == null) {
                onSubscriberOpened(eventSubscriptionCreationResult);
            } else {
                onSubscriberOpenFailed(i, th);
            }
        });
    }

    private void closeSubscriber(T t) {
        t.disable();
        this.subscribersList.remove(t);
        this.subscriberState.put(t.getPartitionId(), SubscriberState.UNSUBSCRIBING);
        this.actor.runUntilDone(() -> {
            if (t.hasEventsInProcessing()) {
                this.actor.yield();
                return;
            }
            this.actor.runOnCompletionBlockingCurrentPhase(doCloseSubscriber(t), (r6, th) -> {
                if (th != null) {
                    Loggers.SUBSCRIPTION_LOGGER.error("Could not close subscriber. Ignoring.", th);
                }
                onSubscriberClosed(t);
            });
            this.actor.done();
        });
    }

    protected ActorFuture<Void> doCloseSubscriber(T t) {
        return t.requestSubscriptionClose();
    }

    private void setCloseReason(String str, Throwable th) {
        if (this.closeReason == null) {
            this.closeReason = str;
            this.closeCause = th;
        }
    }

    private void onSubscriberOpenFailed(int i, Throwable th) {
        this.subscriberState.put(i, SubscriberState.NOT_SUBSCRIBED);
        if (checkGroupOpen() || checkGroupClosed() || this.state == STATE_CLOSING) {
            return;
        }
        initClose("Could not subscribe to partition", th);
    }

    public ActorCondition buildReplenishmentTrigger(T t) {
        return this.subscriptionManager.isClosing() ? new ActorCondition() { // from class: io.zeebe.client.task.impl.subscription.SubscriberGroup.1
            public void signal() {
            }
        } : this.actor.onCondition(this.topic, () -> {
            this.actor.runOnCompletion(t.replenishEventSource(), (obj, th) -> {
                if (th != null) {
                    initClose("Could not replenish event source (submit ack or credits)", th);
                }
            });
        });
    }

    private void onSubscriberOpened(EventSubscriptionCreationResult eventSubscriptionCreationResult) {
        T buildSubscriber = buildSubscriber(eventSubscriptionCreationResult);
        this.subscriberState.put(buildSubscriber.getPartitionId(), SubscriberState.SUBSCRIBED);
        this.subscribersList.add(buildSubscriber);
        this.subscriptionManager.addSubscriber(buildSubscriber);
        checkGroupOpen();
        if (this.state == STATE_CLOSING) {
            closeSubscriber(buildSubscriber);
        }
    }

    private void onSubscriberClosed(T t) {
        this.subscriptionManager.removeSubscriber(t);
        this.subscriberState.put(t.getPartitionId(), SubscriberState.NOT_SUBSCRIBED);
        checkGroupClosed();
    }

    private boolean checkGroupClosed() {
        if (this.state != STATE_CLOSING || !allPartitionsNotSubscribed()) {
            return false;
        }
        this.state = 3;
        onGroupClosed();
        return true;
    }

    private boolean checkGroupOpen() {
        if (this.state != 0 || !allPartitionsResolved()) {
            return false;
        }
        this.state = STATE_OPEN;
        onGroupOpen();
        return true;
    }

    private boolean allPartitionsSubscribed() {
        return allPartitionsInSubscriberState(subscriberState -> {
            return subscriberState == SubscriberState.SUBSCRIBED;
        });
    }

    private boolean allPartitionsResolved() {
        return this.subscriberState.values().stream().allMatch(subscriberState -> {
            return subscriberState == SubscriberState.SUBSCRIBED || subscriberState == SubscriberState.NOT_SUBSCRIBED;
        });
    }

    private boolean allPartitionsNotSubscribed() {
        return allPartitionsInSubscriberState(subscriberState -> {
            return subscriberState == SubscriberState.NOT_SUBSCRIBED;
        });
    }

    private boolean allPartitionsInSubscriberState(Predicate<SubscriberState> predicate) {
        return this.subscriberState.values().stream().allMatch(predicate);
    }

    public boolean isSubscribingTo(int i) {
        return this.subscriberState.get(i) == SubscriberState.SUBSCRIBING;
    }

    protected abstract String describeGroup();

    public int pollEvents(CheckedConsumer<GeneralEventImpl> checkedConsumer) {
        int i = STATE_OPENING;
        Iterator<T> it = this.subscribersList.iterator();
        while (it.hasNext()) {
            i += it.next().pollEvents(checkedConsumer);
        }
        return i;
    }

    public boolean isOpen() {
        return this.state == STATE_OPEN;
    }

    public boolean isClosed() {
        return this.state == 3;
    }

    public int size() {
        int i = STATE_OPENING;
        Iterator<T> it = this.subscribersList.iterator();
        while (it.hasNext()) {
            i += it.next().size();
        }
        return i;
    }

    public int numActiveSubscribers() {
        return this.subscribersList.size();
    }

    public abstract int poll();

    protected abstract ActorFuture<? extends EventSubscriptionCreationResult> requestNewSubscriber(int i);

    protected abstract T buildSubscriber(EventSubscriptionCreationResult eventSubscriptionCreationResult);

    public abstract boolean isManagedGroup();
}
