package org.apache.pulsar.broker.loadbalance.extensions.store;

import java.io.IOException;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/loadbalance/extensions/store/TableViewLoadDataStoreImpl.class */
public class TableViewLoadDataStoreImpl<T> implements LoadDataStore<T> {
    private static final Logger log = LoggerFactory.getLogger(TableViewLoadDataStoreImpl.class);
    private static final long LOAD_DATA_REPORT_UPDATE_MAX_INTERVAL_MULTIPLIER_BEFORE_RESTART = 2;
    private static final long INIT_TIMEOUT_IN_SECS = 5;
    private volatile TableView<T> tableView;
    private volatile long tableViewLastUpdateTimestamp;
    private volatile Producer<T> producer;
    private final ServiceConfiguration conf;
    private final PulsarClient client;
    private final String topic;
    private final Class<T> clazz;

    public TableViewLoadDataStoreImpl(PulsarService pulsarService, String str, Class<T> cls) throws LoadDataStoreException {
        try {
            this.conf = pulsarService.getConfiguration();
            this.client = pulsarService.getClient();
            this.topic = str;
            this.clazz = cls;
        } catch (Exception e) {
            throw new LoadDataStoreException(e);
        }
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore
    public synchronized CompletableFuture<Void> pushAsync(String str, T t) {
        validateProducer();
        return this.producer.newMessage().key(str).value(t).sendAsync().thenAccept((Consumer) messageId -> {
        });
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore
    public synchronized CompletableFuture<Void> removeAsync(String str) {
        validateProducer();
        return this.producer.newMessage().key(str).value((Object) null).sendAsync().thenAccept((Consumer) messageId -> {
        });
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore
    public synchronized Optional<T> get(String str) {
        validateTableView();
        return Optional.ofNullable(this.tableView.get(str));
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore
    public synchronized void forEach(BiConsumer<String, T> biConsumer) {
        validateTableView();
        this.tableView.forEach(biConsumer);
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore
    public synchronized Set<Map.Entry<String, T>> entrySet() {
        validateTableView();
        return this.tableView.entrySet();
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore
    public synchronized int size() {
        validateTableView();
        return this.tableView.size();
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore
    public synchronized void closeTableView() throws IOException {
        if (this.tableView != null) {
            this.tableView.close();
            this.tableView = null;
        }
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore
    public synchronized void start() throws LoadDataStoreException {
        startProducer();
        startTableView();
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore
    public synchronized void startTableView() throws LoadDataStoreException {
        if (this.tableView == null) {
            try {
                this.tableView = (TableView) this.client.newTableViewBuilder(Schema.JSON(this.clazz)).topic(this.topic).createAsync().get(INIT_TIMEOUT_IN_SECS, TimeUnit.SECONDS);
                this.tableView.forEachAndListen((str, obj) -> {
                    this.tableViewLastUpdateTimestamp = System.currentTimeMillis();
                });
            } catch (Exception e) {
                this.tableView = null;
                throw new LoadDataStoreException(e);
            }
        }
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore
    public synchronized void startProducer() throws LoadDataStoreException {
        if (this.producer == null) {
            try {
                this.producer = (Producer) this.client.newProducer(Schema.JSON(this.clazz)).topic(this.topic).createAsync().get(INIT_TIMEOUT_IN_SECS, TimeUnit.SECONDS);
            } catch (Exception e) {
                this.producer = null;
                throw new LoadDataStoreException(e);
            }
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() throws IOException {
        if (this.producer != null) {
            this.producer.close();
            this.producer = null;
        }
        closeTableView();
    }

    @Override // org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore
    public synchronized void init() throws IOException {
        close();
        start();
    }

    private void validateProducer() {
        if (this.producer == null) {
            try {
                startProducer();
                log.info("Restarted producer on {}", this.topic);
            } catch (Exception e) {
                log.error("Failed to restart producer on {}", this.topic, e);
                throw new RuntimeException(e);
            }
        }
    }

    private void validateTableView() {
        String str = null;
        if (this.tableView == null) {
            str = "table view is null";
        } else {
            long currentTimeMillis = System.currentTimeMillis() - this.tableViewLastUpdateTimestamp;
            long millis = TimeUnit.MINUTES.toMillis(this.conf.getLoadBalancerReportUpdateMaxIntervalMinutes()) * LOAD_DATA_REPORT_UPDATE_MAX_INTERVAL_MULTIPLIER_BEFORE_RESTART;
            if (currentTimeMillis > millis) {
                str = String.format("inactiveDuration=%d secs > threshold = %d secs", Long.valueOf(TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis)), Long.valueOf(TimeUnit.MILLISECONDS.toSeconds(millis)));
            }
        }
        if (StringUtils.isNotBlank(str)) {
            this.tableViewLastUpdateTimestamp = 0L;
            try {
                closeTableView();
                startTableView();
                log.info("Restarted tableview on {}, {}", this.topic, str);
            } catch (Exception e) {
                log.error("Failed to restart tableview on {}", this.topic, e);
                throw new RuntimeException(e);
            }
        }
    }
}
