package org.apache.pulsar.broker.loadbalance.extensions.channel;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectWriter;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewSyncer.class */
public class ServiceUnitStateTableViewSyncer implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(ServiceUnitStateTableViewSyncer.class);
    private static final int MAX_CONCURRENT_SYNC_COUNT = 100;
    private static final int SYNC_WAIT_TIME_IN_SECS = 300;
    private PulsarService pulsar;
    private volatile ServiceUnitStateTableView systemTopicTableView;
    private volatile ServiceUnitStateTableView metadataStoreTableView;
    private volatile boolean isActive = false;

    public void start(PulsarService pulsarService) throws IOException, TimeoutException, InterruptedException, ExecutionException {
        if (pulsarService.getConfiguration().isLoadBalancerServiceUnitTableViewSyncerEnabled() && !this.isActive) {
            this.pulsar = pulsarService;
            try {
                syncExistingItems();
                if (!ExtensibleLoadManagerImpl.configureSystemTopics(pulsarService, 0L)) {
                    throw new IllegalStateException("Failed to disable compaction");
                }
                syncTailItems();
                this.isActive = true;
            } catch (Throwable th) {
                log.error("Failed to start ServiceUnitStateTableViewSyncer", th);
                throw th;
            }
        }
    }

    private CompletableFuture<Void> syncToSystemTopic(String str, ServiceUnitStateData serviceUnitStateData) {
        return this.systemTopicTableView.put(str, serviceUnitStateData);
    }

    private CompletableFuture<Void> syncToMetadataStore(String str, ServiceUnitStateData serviceUnitStateData) {
        return this.metadataStoreTableView.put(str, serviceUnitStateData);
    }

    private void dummy(String str, ServiceUnitStateData serviceUnitStateData) {
    }

    private void syncExistingItems() throws IOException, ExecutionException, InterruptedException, TimeoutException {
        long currentTimeMillis = System.currentTimeMillis();
        ServiceUnitStateMetadataStoreTableViewImpl serviceUnitStateMetadataStoreTableViewImpl = new ServiceUnitStateMetadataStoreTableViewImpl();
        try {
            serviceUnitStateMetadataStoreTableViewImpl.start(this.pulsar, this::dummy, this::dummy);
            ServiceUnitStateTableViewImpl serviceUnitStateTableViewImpl = new ServiceUnitStateTableViewImpl();
            try {
                serviceUnitStateTableViewImpl.start(this.pulsar, this::dummy, this::dummy);
                ServiceConfiguration.ServiceUnitTableViewSyncerType loadBalancerServiceUnitTableViewSyncer = this.pulsar.getConfiguration().getLoadBalancerServiceUnitTableViewSyncer();
                if (loadBalancerServiceUnitTableViewSyncer == ServiceConfiguration.ServiceUnitTableViewSyncerType.SystemTopicToMetadataStoreSyncer) {
                    clean(serviceUnitStateMetadataStoreTableViewImpl);
                    syncExistingItemsToMetadataStore(serviceUnitStateTableViewImpl);
                } else {
                    clean(serviceUnitStateTableViewImpl);
                    syncExistingItemsToSystemTopic(serviceUnitStateMetadataStoreTableViewImpl, serviceUnitStateTableViewImpl);
                }
                if (!waitUntilSynced(serviceUnitStateMetadataStoreTableViewImpl, serviceUnitStateTableViewImpl, currentTimeMillis)) {
                    throw new TimeoutException(loadBalancerServiceUnitTableViewSyncer + " failed to sync existing items in tableviews. MetadataStoreTableView.size: " + serviceUnitStateMetadataStoreTableViewImpl.entrySet().size() + ", SystemTopicTableView.size: " + serviceUnitStateTableViewImpl.entrySet().size() + " in 300 secs");
                }
                log.info("Synced existing items MetadataStoreTableView.size:{} , SystemTopicTableView.size: {} in {} secs", new Object[]{Integer.valueOf(serviceUnitStateMetadataStoreTableViewImpl.entrySet().size()), Integer.valueOf(serviceUnitStateTableViewImpl.entrySet().size()), Long.valueOf(TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - currentTimeMillis))});
                if (Collections.singletonList(serviceUnitStateTableViewImpl).get(0) != null) {
                    serviceUnitStateTableViewImpl.close();
                }
            } catch (Throwable th) {
                if (Collections.singletonList(serviceUnitStateTableViewImpl).get(0) != null) {
                    serviceUnitStateTableViewImpl.close();
                }
                throw th;
            }
        } finally {
            if (Collections.singletonList(serviceUnitStateMetadataStoreTableViewImpl).get(0) != null) {
                serviceUnitStateMetadataStoreTableViewImpl.close();
            }
        }
    }

    private void syncTailItems() throws InterruptedException, IOException, TimeoutException {
        long currentTimeMillis = System.currentTimeMillis();
        if (this.metadataStoreTableView != null) {
            this.metadataStoreTableView.close();
            this.metadataStoreTableView = null;
        }
        if (this.systemTopicTableView != null) {
            this.systemTopicTableView.close();
            this.systemTopicTableView = null;
        }
        this.metadataStoreTableView = new ServiceUnitStateMetadataStoreTableViewImpl();
        this.metadataStoreTableView.start(this.pulsar, this::syncToSystemTopic, this::dummy);
        log.info("Started MetadataStoreTableView");
        this.systemTopicTableView = new ServiceUnitStateTableViewImpl();
        this.systemTopicTableView.start(this.pulsar, this::syncToMetadataStore, this::dummy);
        log.info("Started SystemTopicTableView");
        ServiceConfiguration.ServiceUnitTableViewSyncerType loadBalancerServiceUnitTableViewSyncer = this.pulsar.getConfiguration().getLoadBalancerServiceUnitTableViewSyncer();
        if (!waitUntilSynced(this.metadataStoreTableView, this.systemTopicTableView, currentTimeMillis)) {
            throw new TimeoutException(loadBalancerServiceUnitTableViewSyncer + " failed to sync tableviews. MetadataStoreTableView.size: " + this.metadataStoreTableView.entrySet().size() + ", SystemTopicTableView.size: " + this.systemTopicTableView.entrySet().size() + " in 300 secs");
        }
        log.info("Successfully started ServiceUnitStateTableViewSyncer MetadataStoreTableView.size:{} , SystemTopicTableView.size: {} in {} secs", new Object[]{Integer.valueOf(this.metadataStoreTableView.entrySet().size()), Integer.valueOf(this.systemTopicTableView.entrySet().size()), Long.valueOf(TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - currentTimeMillis))});
    }

    private void syncExistingItemsToMetadataStore(ServiceUnitStateTableView serviceUnitStateTableView) throws JsonProcessingException, ExecutionException, InterruptedException, TimeoutException {
        MetadataStoreExtended localMetadataStore = this.pulsar.getLocalMetadataStore();
        ObjectWriter writer = ObjectMapperFactory.getMapper().writer();
        int metadataStoreOperationTimeoutSeconds = this.pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds();
        ArrayList arrayList = new ArrayList();
        Iterator<Map.Entry<String, ServiceUnitStateData>> it = serviceUnitStateTableView.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, ServiceUnitStateData> next = it.next();
            arrayList.add(localMetadataStore.put("/service_unit_state/" + next.getKey(), writer.writeValueAsBytes(next.getValue()), Optional.empty()).thenApply(stat -> {
                return null;
            }));
            if (arrayList.size() == MAX_CONCURRENT_SYNC_COUNT || !it.hasNext()) {
                FutureUtil.waitForAll(arrayList).get(metadataStoreOperationTimeoutSeconds, TimeUnit.SECONDS);
            }
        }
    }

    private void syncExistingItemsToSystemTopic(ServiceUnitStateTableView serviceUnitStateTableView, ServiceUnitStateTableView serviceUnitStateTableView2) throws ExecutionException, InterruptedException, TimeoutException {
        int metadataStoreOperationTimeoutSeconds = this.pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds();
        ArrayList arrayList = new ArrayList();
        Iterator<Map.Entry<String, ServiceUnitStateData>> it = serviceUnitStateTableView.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, ServiceUnitStateData> next = it.next();
            arrayList.add(serviceUnitStateTableView2.put(next.getKey(), next.getValue()));
            if (arrayList.size() == MAX_CONCURRENT_SYNC_COUNT || !it.hasNext()) {
                FutureUtil.waitForAll(arrayList).get(metadataStoreOperationTimeoutSeconds, TimeUnit.SECONDS);
            }
        }
    }

    private void clean(ServiceUnitStateTableView serviceUnitStateTableView) throws ExecutionException, InterruptedException, TimeoutException {
        int metadataStoreOperationTimeoutSeconds = this.pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds();
        Iterator<Map.Entry<String, ServiceUnitStateData>> it = serviceUnitStateTableView.entrySet().iterator();
        ArrayList arrayList = new ArrayList();
        while (it.hasNext()) {
            arrayList.add(serviceUnitStateTableView.delete(it.next().getKey()));
            if (arrayList.size() == MAX_CONCURRENT_SYNC_COUNT || !it.hasNext()) {
                FutureUtil.waitForAll(arrayList).get(metadataStoreOperationTimeoutSeconds, TimeUnit.SECONDS);
            }
        }
    }

    private boolean waitUntilSynced(ServiceUnitStateTableView serviceUnitStateTableView, ServiceUnitStateTableView serviceUnitStateTableView2, long j) throws InterruptedException {
        while (serviceUnitStateTableView.entrySet().size() != serviceUnitStateTableView2.entrySet().size()) {
            if (TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis() - j) > 300) {
                return false;
            }
            Thread.sleep(100L);
        }
        return true;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.isActive) {
            if (!ExtensibleLoadManagerImpl.configureSystemTopics(this.pulsar, ExtensibleLoadManagerImpl.COMPACTION_THRESHOLD)) {
                throw new IllegalStateException("Failed to enable compaction");
            }
            try {
                if (this.systemTopicTableView != null) {
                    this.systemTopicTableView.close();
                    this.systemTopicTableView = null;
                    log.info("Closed SystemTopicTableView");
                }
                try {
                    if (this.metadataStoreTableView != null) {
                        this.metadataStoreTableView.close();
                        this.metadataStoreTableView = null;
                        log.info("Closed MetadataStoreTableView");
                    }
                    log.info("Successfully closed ServiceUnitStateTableViewSyncer.");
                    this.isActive = false;
                } catch (Exception e) {
                    log.error("Failed to close MetadataStoreTableView", e);
                    throw e;
                }
            } catch (Exception e2) {
                log.error("Failed to close SystemTopicTableView", e2);
                throw e2;
            }
        }
    }

    public boolean isActive() {
        return this.isActive;
    }
}
