package io.zeebe.client.task.impl.subscription;

import io.zeebe.client.event.impl.GeneralEventImpl;
import io.zeebe.client.impl.Loggers;
import io.zeebe.transport.RemoteAddress;
import io.zeebe.util.CheckedConsumer;
import io.zeebe.util.sched.ActorCondition;
import io.zeebe.util.sched.future.ActorFuture;
import io.zeebe.util.sched.future.CompletableActorFuture;
import java.util.concurrent.atomic.AtomicInteger;
import org.agrona.concurrent.ManyToManyConcurrentArrayQueue;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/client/task/impl/subscription/Subscriber.class */
public abstract class Subscriber {
    protected static final Logger LOGGER = Loggers.SUBSCRIPTION_LOGGER;
    protected static final String LOG_MESSAGE_PREFIX = "Subscriber {}: ";
    public static final double REPLENISHMENT_THRESHOLD = 0.3d;
    protected final long subscriberKey;
    protected final ManyToManyConcurrentArrayQueue<GeneralEventImpl> pendingEvents;
    protected final int capacity;
    protected final SubscriptionManager acquisition;
    protected final SubscriberGroup<?> group;
    protected RemoteAddress eventSource;
    protected int partitionId;
    private final ActorCondition replenishmentTrigger;
    private static final int STATE_OPEN = 0;
    private static final int STATE_DISABLED = 1;
    protected final AtomicInteger eventsInProcessing = new AtomicInteger(STATE_OPEN);
    protected final AtomicInteger eventsProcessedSinceLastReplenishment = new AtomicInteger(STATE_OPEN);
    private volatile int state = STATE_OPEN;

    public Subscriber(long j, int i, int i2, RemoteAddress remoteAddress, SubscriberGroup subscriberGroup, SubscriptionManager subscriptionManager) {
        this.subscriberKey = j;
        this.eventSource = remoteAddress;
        this.pendingEvents = new ManyToManyConcurrentArrayQueue<>(i2);
        this.capacity = i2;
        this.group = subscriberGroup;
        this.acquisition = subscriptionManager;
        this.partitionId = i;
        this.replenishmentTrigger = subscriberGroup.buildReplenishmentTrigger(this);
    }

    public RemoteAddress getEventSource() {
        return this.eventSource;
    }

    public boolean isOpen() {
        return this.state == 0;
    }

    public int size() {
        return this.pendingEvents.size();
    }

    private boolean shouldReplenishEventSource() {
        return ((double) (this.capacity - this.eventsProcessedSinceLastReplenishment.get())) <= ((double) this.capacity) * 0.3d;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ActorFuture<?> replenishEventSource() {
        int i = this.eventsProcessedSinceLastReplenishment.get();
        if (i <= 0) {
            return CompletableActorFuture.completed((Object) null);
        }
        ActorFuture<?> requestEventSourceReplenishment = requestEventSourceReplenishment(i);
        this.eventsProcessedSinceLastReplenishment.addAndGet(-i);
        return requestEventSourceReplenishment;
    }

    public long getSubscriberKey() {
        return this.subscriberKey;
    }

    protected abstract ActorFuture<?> requestEventSourceReplenishment(int i);

    public boolean addEvent(GeneralEventImpl generalEventImpl) {
        boolean offer = this.pendingEvents.offer(generalEventImpl);
        if (!offer) {
            LOGGER.warn("Subscriber {}: Cannot add any more events. Event queue saturated. Postponing event {}.", this, generalEventImpl);
        }
        return offer;
    }

    protected void resetProcessingState() {
        this.pendingEvents.clear();
        this.eventsInProcessing.set(STATE_OPEN);
        this.eventsProcessedSinceLastReplenishment.set(STATE_OPEN);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean hasEventsInProcessing() {
        return this.eventsInProcessing.get() > 0;
    }

    public void disable() {
        this.state = STATE_DISABLED;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int pollEvents(CheckedConsumer<GeneralEventImpl> checkedConsumer) {
        GeneralEventImpl generalEventImpl;
        int size = size();
        int i = STATE_OPEN;
        while (true) {
            if (i >= size || !isOpen() || (generalEventImpl = (GeneralEventImpl) this.pendingEvents.poll()) == null) {
                break;
            }
            this.eventsInProcessing.incrementAndGet();
            try {
                if (isOpen()) {
                    i += STATE_DISABLED;
                    logHandling(generalEventImpl);
                    try {
                        checkedConsumer.accept(generalEventImpl);
                    } catch (Exception e) {
                        onUnhandledEventHandlingException(generalEventImpl, e);
                    }
                    this.eventsInProcessing.decrementAndGet();
                    this.eventsProcessedSinceLastReplenishment.incrementAndGet();
                    if (shouldReplenishEventSource()) {
                        this.replenishmentTrigger.signal();
                    }
                }
            } finally {
                this.eventsInProcessing.decrementAndGet();
                this.eventsProcessedSinceLastReplenishment.incrementAndGet();
                if (shouldReplenishEventSource()) {
                    this.replenishmentTrigger.signal();
                }
            }
        }
        return i;
    }

    protected void logHandling(GeneralEventImpl generalEventImpl) {
        try {
            LOGGER.trace("Subscriber {}: Handling event {}", this, generalEventImpl);
        } catch (Exception e) {
            LOGGER.warn("Could not construct or write log message", e);
        }
    }

    protected void onUnhandledEventHandlingException(GeneralEventImpl generalEventImpl, Exception exc) {
        throw new RuntimeException("Exception during handling of event " + generalEventImpl.getMetadata().getKey(), exc);
    }

    public abstract String getTopicName();

    public int getPartitionId() {
        return this.partitionId;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public abstract ActorFuture<Void> requestSubscriptionClose();
}
