package org.apache.pulsar.broker.service;

import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.MetadataEvent;
import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.class */
public class PulsarMetadataEventSynchronizer implements MetadataEventSynchronizer {
    protected PulsarService pulsar;
    protected BrokerService brokerService;
    protected String topicName;
    protected PulsarClientImpl client;
    protected volatile org.apache.pulsar.client.api.Producer<MetadataEvent> producer;
    protected volatile org.apache.pulsar.client.api.Consumer<MetadataEvent> consumer;
    public static final String SUBSCRIPTION_NAME = "metadata-syncer";
    private static final int MAX_PRODUCER_PENDING_SIZE = 1000;
    private volatile CompletableFuture<Void> closeFuture;
    private static final Logger log = LoggerFactory.getLogger(PulsarMetadataEventSynchronizer.class);
    static final AtomicReferenceFieldUpdater<PulsarMetadataEventSynchronizer, State> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(PulsarMetadataEventSynchronizer.class, State.class, "state");
    private final CopyOnWriteArrayList<Function<MetadataEvent, CompletableFuture<Void>>> listeners = new CopyOnWriteArrayList<>();
    protected final Backoff backOff = new Backoff(100, TimeUnit.MILLISECONDS, 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
    private volatile State state = State.Init;

    /* loaded from: input_file:org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer$State.class */
    public enum State {
        Init,
        Starting_Producer,
        Starting_Consumer,
        Started,
        Closing,
        Closed
    }

    public PulsarMetadataEventSynchronizer(PulsarService pulsarService, String str) {
        this.pulsar = pulsarService;
        this.brokerService = pulsarService.getBrokerService();
        this.topicName = str;
        if (StringUtils.isNotBlank(str)) {
            return;
        }
        log.info("Metadata synchronizer is disabled");
    }

    public void start() throws PulsarServerException {
        if (StringUtils.isBlank(this.topicName)) {
            log.info("metadata topic doesn't exist.. skipping metadata synchronizer init..");
            return;
        }
        log.info("Metadata event synchronizer is starting on topic {}", this.topicName);
        this.client = this.pulsar.getClient();
        if (STATE_UPDATER.compareAndSet(this, State.Init, State.Starting_Producer)) {
            startProducer();
        }
    }

    public CompletableFuture<Void> notify(MetadataEvent metadataEvent) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        publishAsync(metadataEvent, completableFuture);
        return completableFuture;
    }

    public void registerSyncListener(Function<MetadataEvent, CompletableFuture<Void>> function) {
        this.listeners.add(function);
    }

    public String getClusterName() {
        return this.pulsar.getConfig().getClusterName();
    }

    private void publishAsync(MetadataEvent metadataEvent, CompletableFuture<Void> completableFuture) {
        if (!isProducerStarted()) {
            log.info("Producer is not started on {}, failed to publish {}", this.topicName, metadataEvent);
            completableFuture.completeExceptionally(new IllegalStateException("producer is not started yet"));
        }
        this.producer.newMessage().value(metadataEvent).sendAsync().thenAccept(messageId -> {
            log.info("successfully published metadata change event {}", metadataEvent);
            completableFuture.complete(null);
        }).exceptionally(th -> {
            log.warn("failed to publish metadata update {}, will retry in {}", new Object[]{this.topicName, 1000, th});
            this.pulsar.getBrokerService().executor().schedule(() -> {
                publishAsync(metadataEvent, completableFuture);
            }, 1000L, TimeUnit.MILLISECONDS);
            return null;
        });
    }

    private void startProducer() {
        if (isClosingOrClosed()) {
            log.info("[{}] Skip to start new producer because the synchronizer is closed", this.topicName);
        }
        if (this.producer != null) {
            log.error("[{}] Failed to start the producer because the producer has been set, state: {}", this.topicName, this.state);
        } else {
            log.info("[{}] Starting producer", this.topicName);
            this.client.newProducer(Schema.AVRO(MetadataEvent.class)).topic(this.topicName).messageRoutingMode(MessageRoutingMode.SinglePartition).enableBatching(false).enableBatching(false).sendTimeout(0, TimeUnit.SECONDS).maxPendingMessages(1000).createAsync().thenAccept(producer -> {
                this.backOff.reset();
                if (STATE_UPDATER.compareAndSet(this, State.Starting_Producer, State.Starting_Consumer)) {
                    this.producer = producer;
                    log.info("producer is created successfully {}", this.topicName);
                    startConsumer();
                } else {
                    State state = this.state;
                    log.info("[{}] Closing the new producer because the synchronizer state is {}", producer, state);
                    CompletableFuture<Void> completableFuture = new CompletableFuture<>();
                    closeResource(() -> {
                        return producer.closeAsync();
                    }, completableFuture);
                    completableFuture.thenRun(() -> {
                        log.info("[{}] Closed the new producer because the synchronizer state is {}", producer, state);
                    });
                }
            }).exceptionally(th -> {
                long next = this.backOff.next();
                log.warn("[{}] Failed to create producer ({}), retrying in {} s", new Object[]{this.topicName, th.getMessage(), Double.valueOf(next / 1000.0d)});
                this.brokerService.executor().schedule(this::startProducer, next, TimeUnit.MILLISECONDS);
                return null;
            });
        }
    }

    private void startConsumer() {
        if (isClosingOrClosed()) {
            log.info("[{}] Skip to start new consumer because the synchronizer is closed", this.topicName);
        }
        if (this.consumer != null) {
            log.error("[{}] Failed to start the consumer because the consumer has been set, state: {}", this.topicName, this.state);
        } else {
            log.info("[{}] Starting consumer", this.topicName);
            this.client.newConsumer(Schema.AVRO(MetadataEvent.class)).topic(new String[]{this.topicName}).subscriptionName(SUBSCRIPTION_NAME).ackTimeout(60L, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Failover).messageListener((consumer, message) -> {
                log.info("Processing metadata event for {} with listeners {}", ((MetadataEvent) message.getValue()).getPath(), Integer.valueOf(this.listeners.size()));
                try {
                    if (this.listeners.size() == 0) {
                        consumer.acknowledgeAsync(message);
                        return;
                    }
                    if (this.listeners.size() == 1) {
                        this.listeners.get(0).apply((MetadataEvent) message.getValue()).thenApply(r5 -> {
                            return consumer.acknowledgeAsync(message);
                        }).exceptionally((Function<Throwable, ? extends U>) th -> {
                            log.warn("Failed to synchronize {} for {}", new Object[]{message.getMessageId(), this.topicName, th.getCause()});
                            return null;
                        });
                    } else {
                        FutureUtil.waitForAll((Collection) this.listeners.stream().map(function -> {
                            return (CompletableFuture) function.apply((MetadataEvent) message.getValue());
                        }).collect(Collectors.toList())).thenApply(r52 -> {
                            return consumer.acknowledgeAsync(message);
                        }).exceptionally(th2 -> {
                            log.warn("Failed to synchronize {} for {}", message.getMessageId(), this.topicName);
                            return null;
                        });
                    }
                } catch (Exception e) {
                    log.warn("Failed to synchronize {} for {}", message.getMessageId(), this.topicName);
                }
            }).subscribeAsync().thenAccept(consumer2 -> {
                this.backOff.reset();
                if (STATE_UPDATER.compareAndSet(this, State.Starting_Consumer, State.Started)) {
                    this.consumer = consumer2;
                    log.info("successfully created consumer {}", this.topicName);
                    return;
                }
                State state = this.state;
                log.info("[{}] Closing the new consumer because the synchronizer state is {}", state);
                CompletableFuture<Void> completableFuture = new CompletableFuture<>();
                closeResource(() -> {
                    return consumer2.closeAsync();
                }, completableFuture);
                completableFuture.thenRun(() -> {
                    log.info("[{}] Closed the new consumer because the synchronizer state is {}", state);
                });
            }).exceptionally(th -> {
                long next = this.backOff.next();
                log.warn("[{}] Failed to create consumer ({}), retrying in {} s", new Object[]{this.topicName, th.getMessage(), Double.valueOf(next / 1000.0d)});
                this.brokerService.executor().schedule(this::startConsumer, next, TimeUnit.MILLISECONDS);
                return null;
            });
        }
    }

    public boolean isStarted() {
        return this.state == State.Started;
    }

    public boolean isProducerStarted() {
        return this.state.ordinal() > State.Starting_Producer.ordinal() && this.state.ordinal() < State.Closing.ordinal();
    }

    public boolean isClosingOrClosed() {
        return this.state == State.Closing || this.state == State.Closed;
    }

    public synchronized CompletableFuture<Void> closeAsync() {
        int i = 0;
        while (!isClosingOrClosed()) {
            if (STATE_UPDATER.compareAndSet(this, State.Init, State.Closing) || STATE_UPDATER.compareAndSet(this, State.Starting_Producer, State.Closing) || STATE_UPDATER.compareAndSet(this, State.Starting_Consumer, State.Closing) || STATE_UPDATER.compareAndSet(this, State.Started, State.Closing)) {
                CompletableFuture<Void> completableFuture = new CompletableFuture<>();
                CompletableFuture<Void> completableFuture2 = new CompletableFuture<>();
                if (this.producer == null) {
                    completableFuture.complete(null);
                } else {
                    closeResource(() -> {
                        return this.producer.closeAsync();
                    }, completableFuture);
                }
                if (this.consumer == null) {
                    completableFuture2.complete(null);
                } else {
                    closeResource(() -> {
                        return this.consumer.closeAsync();
                    }, completableFuture2);
                }
                completableFuture.thenRun(() -> {
                    log.info("Successfully close producer {}", this.topicName);
                });
                completableFuture2.thenRun(() -> {
                    log.info("Successfully close consumer {}", this.topicName);
                });
                this.closeFuture = FutureUtil.waitForAll(Arrays.asList(completableFuture, completableFuture2));
                this.closeFuture.thenRun(() -> {
                    this.state = State.Closed;
                    log.info("Successfully close metadata store synchronizer {}", this.topicName);
                });
                return this.closeFuture;
            }
            i++;
            if (i > 100) {
                log.error("Unexpected error: the state can not be changed to closing {}, state: {}", this.topicName, this.state);
                return CompletableFuture.failedFuture(new RuntimeException("Unexpected error, the state can not be changed to closing"));
            }
        }
        return this.closeFuture;
    }

    private void closeResource(Supplier<CompletableFuture<Void>> supplier, CompletableFuture<Void> completableFuture) {
        if (supplier == null) {
            completableFuture.complete(null);
        } else {
            supplier.get().whenComplete((r13, th) -> {
                if (th == null) {
                    this.backOff.reset();
                    completableFuture.complete(null);
                } else {
                    long next = this.backOff.next();
                    log.warn("[{}] Exception: '{}' occurred while trying to close the %s. Retrying again in {} s.", new Object[]{this.topicName, th.getMessage(), supplier.getClass().getSimpleName(), Double.valueOf(next / 1000.0d), th});
                    this.brokerService.executor().schedule(() -> {
                        closeResource(supplier, completableFuture);
                    }, next, TimeUnit.MILLISECONDS);
                }
            });
        }
    }

    public String getTopicName() {
        return this.topicName;
    }

    public State getState() {
        return this.state;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1680149338:
                if (implMethodName.equals("lambda$startConsumer$84bf2754$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case StickyKeyConsumerSelector.STICKY_KEY_HASH_NOT_SET /* 0 */:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    PulsarMetadataEventSynchronizer pulsarMetadataEventSynchronizer = (PulsarMetadataEventSynchronizer) serializedLambda.getCapturedArg(0);
                    return (consumer, message) -> {
                        log.info("Processing metadata event for {} with listeners {}", ((MetadataEvent) message.getValue()).getPath(), Integer.valueOf(this.listeners.size()));
                        try {
                            if (this.listeners.size() == 0) {
                                consumer.acknowledgeAsync(message);
                                return;
                            }
                            if (this.listeners.size() == 1) {
                                this.listeners.get(0).apply((MetadataEvent) message.getValue()).thenApply(r5 -> {
                                    return consumer.acknowledgeAsync(message);
                                }).exceptionally((Function<Throwable, ? extends U>) th -> {
                                    log.warn("Failed to synchronize {} for {}", new Object[]{message.getMessageId(), this.topicName, th.getCause()});
                                    return null;
                                });
                            } else {
                                FutureUtil.waitForAll((Collection) this.listeners.stream().map(function -> {
                                    return (CompletableFuture) function.apply((MetadataEvent) message.getValue());
                                }).collect(Collectors.toList())).thenApply(r52 -> {
                                    return consumer.acknowledgeAsync(message);
                                }).exceptionally(th2 -> {
                                    log.warn("Failed to synchronize {} for {}", message.getMessageId(), this.topicName);
                                    return null;
                                });
                            }
                        } catch (Exception e) {
                            log.warn("Failed to synchronize {} for {}", message.getMessageId(), this.topicName);
                        }
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
