/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.worker;

import java.util.function.Supplier;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerEventListener;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.functions.worker.ErrorNotifier;
import org.apache.pulsar.functions.worker.FunctionAssignmentTailer;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
import org.apache.pulsar.functions.worker.MembershipManager;
import org.apache.pulsar.functions.worker.SchedulerManager;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.WorkerUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LeaderService
implements AutoCloseable,
ConsumerEventListener {
    private static final Logger log = LoggerFactory.getLogger(LeaderService.class);
    private final String consumerName;
    private final FunctionAssignmentTailer functionAssignmentTailer;
    private final ErrorNotifier errorNotifier;
    private final SchedulerManager schedulerManager;
    private final FunctionRuntimeManager functionRuntimeManager;
    private final FunctionMetaDataManager functionMetaDataManager;
    private final MembershipManager membershipManager;
    private ConsumerImpl<byte[]> consumer;
    private final WorkerConfig workerConfig;
    private final PulsarClient pulsarClient;
    private boolean isLeader = false;
    static final String COORDINATION_TOPIC_SUBSCRIPTION = "participants";
    private static String WORKER_IDENTIFIER = "id";

    public LeaderService(WorkerService workerService, PulsarClient pulsarClient, FunctionAssignmentTailer functionAssignmentTailer, SchedulerManager schedulerManager, FunctionRuntimeManager functionRuntimeManager, FunctionMetaDataManager functionMetaDataManager, MembershipManager membershipManager, ErrorNotifier errorNotifier) {
        this.workerConfig = workerService.getWorkerConfig();
        this.pulsarClient = pulsarClient;
        this.functionAssignmentTailer = functionAssignmentTailer;
        this.schedulerManager = schedulerManager;
        this.functionRuntimeManager = functionRuntimeManager;
        this.functionMetaDataManager = functionMetaDataManager;
        this.membershipManager = membershipManager;
        this.errorNotifier = errorNotifier;
        this.consumerName = String.format("%s:%s:%d", this.workerConfig.getWorkerId(), this.workerConfig.getWorkerHostname(), this.workerConfig.getTlsEnabled() ? this.workerConfig.getWorkerPortTls() : this.workerConfig.getWorkerPort());
    }

    public void start() throws PulsarClientException {
        this.consumer = (ConsumerImpl)this.pulsarClient.newConsumer().topic(this.workerConfig.getClusterCoordinationTopic()).subscriptionName(COORDINATION_TOPIC_SUBSCRIPTION).subscriptionType(SubscriptionType.Failover).consumerEventListener(this).property(WORKER_IDENTIFIER, this.consumerName).consumerName(this.consumerName).subscribe();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void becameActive(Consumer<?> consumer, int partitionId) {
        LeaderService leaderService = this;
        synchronized (leaderService) {
            if (this.isLeader) {
                return;
            }
            log.info("Worker {} became the leader.", (Object)this.consumerName);
            try {
                this.functionMetaDataManager.getIsInitialized().get();
                this.functionRuntimeManager.getIsInitialized().get();
                Supplier<Boolean> checkIsStillLeader = WorkerUtils.getIsStillLeaderSupplier(this.membershipManager, this.workerConfig.getWorkerId());
                Producer<byte[]> scheduleManagerExclusiveProducer = null;
                Producer<byte[]> functionMetaDataManagerExclusiveProducer = null;
                try {
                    scheduleManagerExclusiveProducer = this.schedulerManager.acquireExclusiveWrite(checkIsStillLeader);
                    functionMetaDataManagerExclusiveProducer = this.functionMetaDataManager.acquireExclusiveWrite(checkIsStillLeader);
                }
                catch (WorkerUtils.NotLeaderAnymore e) {
                    log.info("Worker {} is not leader anymore. Exiting becoming leader routine.", consumer);
                    if (scheduleManagerExclusiveProducer != null) {
                        scheduleManagerExclusiveProducer.close();
                    }
                    if (functionMetaDataManagerExclusiveProducer != null) {
                        functionMetaDataManagerExclusiveProducer.close();
                    }
                    return;
                }
                this.schedulerManager.initialize(scheduleManagerExclusiveProducer);
                this.functionAssignmentTailer.triggerReadToTheEndAndExit().get();
                this.functionAssignmentTailer.close();
                this.functionMetaDataManager.acquireLeadership(functionMetaDataManagerExclusiveProducer);
                this.isLeader = true;
            }
            catch (Throwable th) {
                log.error("Encountered error when initializing to become leader", th);
                this.errorNotifier.triggerError(th);
            }
        }
        this.schedulerManager.schedule();
    }

    @Override
    public synchronized void becameInactive(Consumer<?> consumer, int partitionId) {
        if (this.isLeader) {
            log.info("Worker {} lost the leadership.", (Object)this.consumerName);
            this.isLeader = false;
            try {
                this.schedulerManager.close();
                if (this.schedulerManager.getLastMessageProduced() == null) {
                    this.functionAssignmentTailer.start();
                } else {
                    this.functionAssignmentTailer.startFromMessage(this.schedulerManager.getLastMessageProduced());
                }
                this.functionMetaDataManager.giveupLeadership();
            }
            catch (Throwable th) {
                log.error("Encountered error in routine when worker lost leadership", th);
                this.errorNotifier.triggerError(th);
            }
        }
    }

    public synchronized boolean isLeader() {
        return this.isLeader;
    }

    @Override
    public void close() throws PulsarClientException {
        if (this.consumer != null) {
            this.consumer.close();
        }
    }
}

