package io.zeebe.client.task.impl.subscription;

import io.zeebe.client.event.impl.GeneralEventImpl;
import io.zeebe.client.impl.Loggers;
import io.zeebe.client.task.impl.subscription.EventSubscription;
import io.zeebe.transport.RemoteAddress;
import io.zeebe.util.CheckedConsumer;
import io.zeebe.util.state.SimpleStateMachineContext;
import io.zeebe.util.state.State;
import io.zeebe.util.state.StateMachine;
import io.zeebe.util.state.StateMachineAgent;
import io.zeebe.util.state.WaitState;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.agrona.concurrent.ManyToManyConcurrentArrayQueue;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/client/task/impl/subscription/EventSubscription.class */
public abstract class EventSubscription<T extends EventSubscription<T>> {
    protected static final Logger LOGGER = Loggers.SUBSCRIPTION_LOGGER;
    protected static final double REPLENISHMENT_THRESHOLD = 0.3d;
    protected static final int TRANSITION_DEFAULT = 0;
    protected static final int TRANSITION_OPEN = 1;
    protected static final int TRANSITION_REOPEN = 2;
    protected static final int TRANSITION_ABORT = 3;
    protected static final int TRANSITION_CLOSE = 4;
    protected long subscriberKey;
    protected final ManyToManyConcurrentArrayQueue<GeneralEventImpl> pendingEvents;
    protected final int capacity;
    protected final EventAcquisition<T> acquisition;
    protected RemoteAddress eventSource;
    protected final String topic;
    protected final int partitionId;
    protected CompletableFuture<T> openFuture;
    protected CompletableFuture<T> closeFuture;
    protected final EventSubscription<T>.InitState initState = new InitState();
    protected final EventSubscription<T>.OpeningState openingState = new OpeningState();
    protected final EventSubscription<T>.OpenState openState = new OpenState();
    protected final EventSubscription<T>.ClosingState closingState = new ClosingState();
    protected final EventSubscription<T>.ClosedState closedState = new ClosedState();
    private final StateMachine<SimpleStateMachineContext> stateMachine = StateMachine.builder(stateMachine -> {
        return new SimpleStateMachineContext(stateMachine);
    }).initialState(this.initState).from(this.initState).take(TRANSITION_OPEN).to(this.openingState).from(this.openingState).take(TRANSITION_DEFAULT).to(this.openState).from(this.openingState).take(3).to(this.closedState).from(this.openState).take(TRANSITION_REOPEN).to(this.openingState).from(this.openState).take(3).to(this.closedState).from(this.openState).take(TRANSITION_CLOSE).to(this.closingState).from(this.closingState).take(TRANSITION_DEFAULT).to(this.closedState).from(this.closingState).take(TRANSITION_CLOSE).to(this.closedState).from(this.closedState).take(TRANSITION_CLOSE).to(this.closedState).build();
    private StateMachineAgent<SimpleStateMachineContext> stateMachineAgent = new StateMachineAgent<>(this.stateMachine);
    protected AtomicBoolean isCloseIssued = new AtomicBoolean(false);
    protected final AtomicInteger eventsInProcessing = new AtomicInteger(TRANSITION_DEFAULT);
    protected final AtomicInteger eventsProcessedSinceLastReplenishment = new AtomicInteger(TRANSITION_DEFAULT);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/zeebe/client/task/impl/subscription/EventSubscription$ClosedState.class */
    public class ClosedState implements WaitState<SimpleStateMachineContext> {
        ClosedState() {
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void work(SimpleStateMachineContext simpleStateMachineContext) throws Exception {
            if (EventSubscription.this.openFuture != null) {
                EventSubscription.this.openFuture.cancel(true);
                EventSubscription.this.openFuture = null;
            }
            if (EventSubscription.this.closeFuture != null) {
                EventSubscription.this.closeFuture.complete(EventSubscription.this.thisSubscription());
                EventSubscription.this.closeFuture = null;
            }
            EventSubscription.this.acquisition.stopManageSubscription(EventSubscription.this.thisSubscription());
        }
    }

    /* loaded from: input_file:io/zeebe/client/task/impl/subscription/EventSubscription$ClosingState.class */
    class ClosingState implements State<SimpleStateMachineContext> {
        ClosingState() {
        }

        public int doWork(SimpleStateMachineContext simpleStateMachineContext) throws Exception {
            if (EventSubscription.this.hasEventsInProcessing()) {
                return EventSubscription.TRANSITION_DEFAULT;
            }
            try {
                EventSubscription.this.requestSubscriptionClose();
            } catch (Exception e) {
                EventSubscription.LOGGER.warn("Exception when closing subscription", e);
            }
            simpleStateMachineContext.take(EventSubscription.TRANSITION_DEFAULT);
            return EventSubscription.TRANSITION_OPEN;
        }
    }

    /* loaded from: input_file:io/zeebe/client/task/impl/subscription/EventSubscription$InitState.class */
    class InitState implements WaitState<SimpleStateMachineContext> {
        InitState() {
        }

        public void work(SimpleStateMachineContext simpleStateMachineContext) throws Exception {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/zeebe/client/task/impl/subscription/EventSubscription$OpenState.class */
    public class OpenState implements State<SimpleStateMachineContext> {
        OpenState() {
        }

        public int doWork(SimpleStateMachineContext simpleStateMachineContext) throws Exception {
            if (EventSubscription.this.openFuture != null) {
                EventSubscription.this.openFuture.complete(EventSubscription.this.thisSubscription());
                EventSubscription.this.openFuture = null;
            }
            return EventSubscription.this.replenishEventSource() ? EventSubscription.TRANSITION_OPEN : EventSubscription.TRANSITION_DEFAULT;
        }
    }

    /* loaded from: input_file:io/zeebe/client/task/impl/subscription/EventSubscription$OpeningState.class */
    class OpeningState implements State<SimpleStateMachineContext> {
        OpeningState() {
        }

        /* JADX WARN: Multi-variable type inference failed */
        public int doWork(SimpleStateMachineContext simpleStateMachineContext) throws Exception {
            try {
                EventSubscriptionCreationResult requestNewSubscription = EventSubscription.this.requestNewSubscription();
                EventSubscription.this.subscriberKey = requestNewSubscription.getSubscriberKey();
                EventSubscription.this.eventSource = requestNewSubscription.getEventPublisher();
                EventSubscription.this.resetProcessingState();
                EventSubscription.this.acquisition.activateSubscription(EventSubscription.this.thisSubscription());
                simpleStateMachineContext.take(EventSubscription.TRANSITION_DEFAULT);
                return EventSubscription.TRANSITION_OPEN;
            } catch (Exception e) {
                EventSubscription.LOGGER.info("Could not open subscription; aborting", e);
                simpleStateMachineContext.take(3);
                return EventSubscription.TRANSITION_OPEN;
            }
        }
    }

    public EventSubscription(String str, int i, int i2, EventAcquisition<T> eventAcquisition) {
        this.pendingEvents = new ManyToManyConcurrentArrayQueue<>(i2);
        this.capacity = i2;
        this.acquisition = eventAcquisition;
        this.topic = str;
        this.partitionId = i;
    }

    public int maintainState() {
        return this.stateMachineAgent.doWork();
    }

    public RemoteAddress getEventSource() {
        return this.eventSource;
    }

    public void close() {
        try {
            closeAsync().get();
        } catch (Exception e) {
            throw new RuntimeException("Exception while closing subscription", e);
        }
    }

    public CompletableFuture<T> closeAsync() {
        this.isCloseIssued.set(true);
        CompletableFuture<T> completableFuture = new CompletableFuture<>();
        if (isClosed()) {
            completableFuture.complete(thisSubscription());
            return completableFuture;
        }
        this.stateMachineAgent.addCommand(simpleStateMachineContext -> {
            if (!simpleStateMachineContext.tryTake(TRANSITION_CLOSE)) {
                completableFuture.cancel(true);
            } else if (this.closeFuture == null) {
                this.closeFuture = completableFuture;
            } else {
                this.closeFuture.whenComplete((eventSubscription, th) -> {
                    if (th == null) {
                        completableFuture.complete(eventSubscription);
                    } else {
                        completableFuture.completeExceptionally(th);
                    }
                });
            }
        });
        return completableFuture;
    }

    public void open() {
        try {
            openAsync().get();
        } catch (Exception e) {
            throw new RuntimeException("Exception while opening subscription", e);
        }
    }

    public CompletableFuture<T> openAsync() {
        CompletableFuture<T> completableFuture = new CompletableFuture<>();
        this.stateMachineAgent.addCommand(simpleStateMachineContext -> {
            if (simpleStateMachineContext.tryTake(TRANSITION_OPEN)) {
                this.openFuture = completableFuture;
            } else {
                completableFuture.cancel(true);
            }
        });
        return completableFuture;
    }

    public void reopenAsync() {
        this.stateMachineAgent.addCommand(simpleStateMachineContext -> {
            simpleStateMachineContext.tryTake(TRANSITION_REOPEN);
        });
    }

    public boolean isOpen() {
        return this.stateMachine.isInState(this.openState);
    }

    public boolean isClosed() {
        return this.stateMachine.isInState(this.closedState);
    }

    public int size() {
        return this.pendingEvents.size();
    }

    public boolean replenishEventSource() {
        int i = this.eventsProcessedSinceLastReplenishment.get();
        boolean z = ((double) (this.capacity - i)) < ((double) this.capacity) * REPLENISHMENT_THRESHOLD;
        if (z) {
            requestEventSourceReplenishment(i);
            this.eventsProcessedSinceLastReplenishment.addAndGet(-i);
        }
        return z;
    }

    public long getSubscriberKey() {
        return this.subscriberKey;
    }

    protected abstract void requestEventSourceReplenishment(int i);

    public boolean addEvent(GeneralEventImpl generalEventImpl) {
        boolean offer = this.pendingEvents.offer(generalEventImpl);
        if (!offer) {
            LOGGER.warn("Cannot add any more events. Event queue saturated. Postponing event.");
        }
        return offer;
    }

    protected T thisSubscription() {
        return this;
    }

    protected void resetProcessingState() {
        this.pendingEvents.clear();
        this.eventsInProcessing.set(TRANSITION_DEFAULT);
        this.eventsProcessedSinceLastReplenishment.set(TRANSITION_DEFAULT);
    }

    protected boolean hasEventsInProcessing() {
        return this.eventsInProcessing.get() > 0;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int pollEvents(CheckedConsumer<GeneralEventImpl> checkedConsumer) {
        GeneralEventImpl generalEventImpl;
        int size = size();
        int i = TRANSITION_DEFAULT;
        while (true) {
            if (i >= size || !isOpen() || this.isCloseIssued.get() || (generalEventImpl = (GeneralEventImpl) this.pendingEvents.poll()) == null) {
                break;
            }
            this.eventsInProcessing.incrementAndGet();
            try {
                if (!isOpen()) {
                    break;
                }
                i += TRANSITION_OPEN;
                logHandling(generalEventImpl);
                try {
                    checkedConsumer.accept(generalEventImpl);
                } catch (Exception e) {
                    onUnhandledEventHandlingException(generalEventImpl, e);
                }
                this.eventsInProcessing.decrementAndGet();
                this.eventsProcessedSinceLastReplenishment.incrementAndGet();
            } finally {
                this.eventsInProcessing.decrementAndGet();
                this.eventsProcessedSinceLastReplenishment.incrementAndGet();
            }
        }
        return i;
    }

    protected void logHandling(GeneralEventImpl generalEventImpl) {
        try {
            LOGGER.debug("{} handling event {}", this, generalEventImpl);
        } catch (Exception e) {
            LOGGER.warn("Could not construct or write log message", e);
        }
    }

    protected void onUnhandledEventHandlingException(GeneralEventImpl generalEventImpl, Exception exc) {
        throw new RuntimeException("Exception during handling of event " + generalEventImpl.getMetadata().getKey(), exc);
    }

    public String getTopicName() {
        return this.topic;
    }

    public int getPartitionId() {
        return this.partitionId;
    }

    protected abstract EventSubscriptionCreationResult requestNewSubscription();

    protected abstract void requestSubscriptionClose();

    public abstract boolean isManagedSubscription();

    public abstract int poll();
}
