package org.apache.pulsar.broker.loadbalance.extensions.channel;

import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateTableViewImpl.class */
public class ServiceUnitStateTableViewImpl extends ServiceUnitStateTableViewBase {
    private static final int MAX_OUTSTANDING_PUB_MESSAGES = 500;
    private volatile Producer<ServiceUnitStateData> producer;
    private volatile TableView<ServiceUnitStateData> tableview;
    private static final Logger log = LoggerFactory.getLogger(ServiceUnitStateTableViewImpl.class);
    public static final String TOPIC = TopicName.get(TopicDomain.persistent.value(), NamespaceName.SYSTEM_NAMESPACE, "loadbalancer-service-unit-state").toString();
    public static final CompressionType MSG_COMPRESSION_TYPE = CompressionType.ZSTD;

    @Override // org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateTableView
    public void start(PulsarService pulsarService, BiConsumer<String, ServiceUnitStateData> biConsumer, BiConsumer<String, ServiceUnitStateData> biConsumer2) throws IOException {
        boolean debug = ExtensibleLoadManagerImpl.debug(pulsarService.getConfiguration(), log);
        init(pulsarService);
        Schema JSON = Schema.JSON(ServiceUnitStateData.class);
        ExtensibleLoadManagerImpl.createSystemTopic(pulsarService, TOPIC);
        if (this.producer != null) {
            this.producer.close();
            if (debug) {
                log.info("Closed the channel producer.");
            }
        }
        this.producer = pulsarService.getClient().newProducer(JSON).enableBatching(true).compressionType(MSG_COMPRESSION_TYPE).maxPendingMessages(MAX_OUTSTANDING_PUB_MESSAGES).blockIfQueueFull(true).topic(TOPIC).create();
        if (debug) {
            log.info("Successfully started the channel producer.");
        }
        if (this.tableview != null) {
            this.tableview.close();
            if (debug) {
                log.info("Closed the channel tableview.");
            }
        }
        this.tableview = pulsarService.getClient().newTableViewBuilder(JSON).topic(TOPIC).loadConf(Map.of("topicCompactionStrategyClassName", ServiceUnitStateDataConflictResolver.class.getName())).create();
        this.tableview.listen(this::updateOwnedServiceUnits);
        this.tableview.listen(biConsumer);
        this.tableview.forEach(this::updateOwnedServiceUnits);
        this.tableview.forEach(biConsumer2);
    }

    private boolean isValidState() {
        return (this.tableview == null || this.producer == null) ? false : true;
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateTableView, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.tableview != null) {
            this.tableview.close();
            this.tableview = null;
            log.info("Successfully closed the channel tableview.");
        }
        if (this.producer != null) {
            this.producer.close();
            this.producer = null;
            log.info("Successfully closed the channel producer.");
        }
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateTableView
    public ServiceUnitStateData get(String str) {
        if (isValidState()) {
            return (ServiceUnitStateData) this.tableview.get(str);
        }
        throw new IllegalStateException("The tableview has not been started.");
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateTableView
    public CompletableFuture<Void> put(String str, ServiceUnitStateData serviceUnitStateData) {
        if (!isValidState()) {
            return CompletableFuture.failedFuture(new IllegalStateException("The tableview has not been started."));
        }
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.producer.newMessage().key(str).value(serviceUnitStateData).sendAsync().whenComplete((messageId, th) -> {
            if (th == null) {
                completableFuture.complete(null);
                return;
            }
            if (th instanceof PulsarClientException.AlreadyClosedException) {
                log.info("Skip publishing the message since the producer is closed, serviceUnit: {}, data: {}", str, serviceUnitStateData);
            } else {
                log.error("Failed to publish the message: serviceUnit:{}, data:{}", new Object[]{str, serviceUnitStateData, th});
            }
            completableFuture.completeExceptionally(th);
        });
        return completableFuture;
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateTableView
    public void flush(long j) throws InterruptedException, TimeoutException, ExecutionException {
        if (!isValidState()) {
            throw new IllegalStateException("The tableview has not been started.");
        }
        long currentTimeMillis = System.currentTimeMillis() + j;
        this.producer.flushAsync().get(j, TimeUnit.MILLISECONDS);
        long currentTimeMillis2 = currentTimeMillis - System.currentTimeMillis();
        if (currentTimeMillis2 < 0) {
            currentTimeMillis2 = 0;
        }
        this.tableview.refreshAsync().get(currentTimeMillis2, TimeUnit.MILLISECONDS);
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateTableView
    public CompletableFuture<Void> delete(String str) {
        return put(str, null);
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateTableView
    public Set<Map.Entry<String, ServiceUnitStateData>> entrySet() {
        if (isValidState()) {
            return this.tableview.entrySet();
        }
        throw new IllegalStateException("The tableview has not been started.");
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateTableViewBase, org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateTableView
    public /* bridge */ /* synthetic */ Set ownedServiceUnits() {
        return super.ownedServiceUnits();
    }
}
