package io.zeebe.client.event.impl;

import io.zeebe.client.task.impl.subscription.Subscriber;
import io.zeebe.client.task.impl.subscription.SubscriberGroup;
import io.zeebe.client.task.impl.subscription.SubscriptionManager;
import io.zeebe.transport.RemoteAddress;
import io.zeebe.util.CheckedConsumer;
import io.zeebe.util.sched.future.ActorFuture;
import io.zeebe.util.sched.future.CompletableActorFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

/* loaded from: input_file:io/zeebe/client/event/impl/TopicSubscriber.class */
public class TopicSubscriber extends Subscriber {
    protected static final int MAX_HANDLING_RETRIES = 2;
    protected final TopicClientImpl client;
    protected AtomicBoolean processingFlag;
    protected volatile long lastProcessedEventPosition;
    protected long lastAcknowledgedPosition;
    protected final TopicSubscriptionSpec subscription;
    protected final Function<CheckedConsumer<GeneralEventImpl>, CheckedConsumer<GeneralEventImpl>> eventHandlerAdapter;

    public TopicSubscriber(TopicClientImpl topicClientImpl, TopicSubscriptionSpec topicSubscriptionSpec, long j, RemoteAddress remoteAddress, int i, SubscriberGroup subscriberGroup, SubscriptionManager subscriptionManager) {
        super(j, i, topicSubscriptionSpec.getPrefetchCapacity(), remoteAddress, subscriberGroup, subscriptionManager);
        this.processingFlag = new AtomicBoolean(false);
        this.subscription = topicSubscriptionSpec;
        this.client = topicClientImpl;
        this.lastProcessedEventPosition = topicSubscriptionSpec.getStartPosition(i);
        this.lastAcknowledgedPosition = topicSubscriptionSpec.getStartPosition(i);
        if (topicSubscriptionSpec.isManaged()) {
            this.eventHandlerAdapter = checkedConsumer -> {
                return checkedConsumer.andThen(this::recordProcessedEvent).andOnExceptionRetry(MAX_HANDLING_RETRIES, this::logRetry).andOnException(this::logExceptionAndClose);
            };
        } else {
            this.eventHandlerAdapter = checkedConsumer2 -> {
                return checkedConsumer2.andThen(this::recordProcessedEvent).andOnException(this::logExceptionAndPropagate);
            };
        }
    }

    @Override // io.zeebe.client.task.impl.subscription.Subscriber
    public int pollEvents(CheckedConsumer<GeneralEventImpl> checkedConsumer) {
        return super.pollEvents(this.eventHandlerAdapter.apply(checkedConsumer));
    }

    protected void logExceptionAndClose(GeneralEventImpl generalEventImpl, Exception exc) {
        logEventHandlingError(exc, generalEventImpl, "Closing subscription.");
        disable();
        this.acquisition.closeGroup(this.group, "Event handling failed");
    }

    protected void logExceptionAndPropagate(GeneralEventImpl generalEventImpl, Exception exc) {
        logEventHandlingError(exc, generalEventImpl, "Propagating exception to caller.");
        throw new RuntimeException(exc);
    }

    protected void logRetry(GeneralEventImpl generalEventImpl, Exception exc) {
        logEventHandlingError(exc, generalEventImpl, "Retrying.");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.zeebe.client.task.impl.subscription.Subscriber
    public ActorFuture<Void> requestSubscriptionClose() {
        System.out.println("Closing subscriber at partition " + this.partitionId);
        return this.client.closeTopicSubscription(this.partitionId, this.subscriberKey).mo10executeAsync();
    }

    @Override // io.zeebe.client.task.impl.subscription.Subscriber
    protected ActorFuture<?> requestEventSourceReplenishment(int i) {
        return acknowledgeLastProcessedEvent();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ActorFuture<?> acknowledgeLastProcessedEvent() {
        long j = this.lastProcessedEventPosition;
        if (j <= this.lastAcknowledgedPosition) {
            return CompletableActorFuture.completed((Object) null);
        }
        ActorFuture<TopicSubscriptionEvent> executeAsync = this.client.acknowledgeEvent(this.subscription.getTopic(), this.partitionId).subscriptionName(this.subscription.getName()).ackPosition(j).mo10executeAsync();
        this.lastAcknowledgedPosition = j;
        return executeAsync;
    }

    protected void recordProcessedEvent(GeneralEventImpl generalEventImpl) {
        this.lastProcessedEventPosition = generalEventImpl.getMetadata().getPosition();
    }

    protected void logEventHandlingError(Exception exc, GeneralEventImpl generalEventImpl, String str) {
        LOGGER.error("Subscriber {}: Unhandled exception during handling of event {}.{}", new Object[]{this, generalEventImpl, str, exc});
    }

    @Override // io.zeebe.client.task.impl.subscription.Subscriber
    public String getTopicName() {
        return this.subscription.getTopic();
    }

    public String toString() {
        return "TopicSubscriber[topic=" + this.subscription.getTopic() + ", partition=" + this.partitionId + ", name=" + this.subscription.getName() + ", subscriberKey=" + this.subscriberKey + "]";
    }
}
