package org.apache.pulsar.broker.service;

import java.lang.invoke.SerializedLambda;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.metadata.api.MetadataEvent;
import org.apache.pulsar.metadata.api.MetadataEventSynchronizer;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer.class */
public class PulsarMetadataEventSynchronizer implements MetadataEventSynchronizer {
    private static final Logger log = LoggerFactory.getLogger(PulsarMetadataEventSynchronizer.class);
    protected PulsarService pulsar;
    protected BrokerService brokerService;
    protected String topicName;
    protected PulsarClientImpl client;
    protected volatile org.apache.pulsar.client.api.Producer<MetadataEvent> producer;
    protected volatile org.apache.pulsar.client.api.Consumer<MetadataEvent> consumer;
    public static final String SUBSCRIPTION_NAME = "metadata-syncer";
    private static final int MAX_PRODUCER_PENDING_SIZE = 1000;
    private final CopyOnWriteArrayList<Function<MetadataEvent, CompletableFuture<Void>>> listeners = new CopyOnWriteArrayList<>();
    private volatile boolean started = false;
    protected final Backoff backOff = new Backoff(100, TimeUnit.MILLISECONDS, 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);

    public PulsarMetadataEventSynchronizer(PulsarService pulsarService, String str) throws PulsarServerException {
        this.pulsar = pulsarService;
        this.brokerService = pulsarService.getBrokerService();
        this.topicName = str;
        if (StringUtils.isNotBlank(str)) {
            return;
        }
        log.info("Metadata synchronizer is disabled");
    }

    public void start() throws PulsarServerException {
        if (StringUtils.isBlank(this.topicName)) {
            log.info("metadata topic doesn't exist.. skipping metadata synchronizer init..");
            return;
        }
        this.client = (PulsarClientImpl) this.pulsar.getClient();
        startProducer();
        startConsumer();
        log.info("Metadata event synchronizer started on topic {}", this.topicName);
    }

    @Override // org.apache.pulsar.metadata.api.MetadataEventSynchronizer
    public CompletableFuture<Void> notify(MetadataEvent metadataEvent) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        publishAsync(metadataEvent, completableFuture);
        return completableFuture;
    }

    @Override // org.apache.pulsar.metadata.api.MetadataEventSynchronizer
    public void registerSyncListener(Function<MetadataEvent, CompletableFuture<Void>> function) {
        this.listeners.add(function);
    }

    @Override // org.apache.pulsar.metadata.api.MetadataEventSynchronizer
    public String getClusterName() {
        return this.pulsar.getConfig().getClusterName();
    }

    private void publishAsync(MetadataEvent metadataEvent, CompletableFuture<Void> completableFuture) {
        if (!this.started) {
            log.info("Producer is not started on {}, failed to publish {}", this.topicName, metadataEvent);
            completableFuture.completeExceptionally(new IllegalStateException("producer is not started yet"));
        }
        this.producer.newMessage().value(metadataEvent).sendAsync().thenAccept(messageId -> {
            log.info("successfully published metadata change event {}", metadataEvent);
            completableFuture.complete(null);
        }).exceptionally(th -> {
            log.warn("failed to publish metadata update {}, will retry in {}", new Object[]{this.topicName, 1000, th});
            this.pulsar.getBrokerService().executor().schedule(() -> {
                publishAsync(metadataEvent, completableFuture);
            }, 1000L, TimeUnit.MILLISECONDS);
            return null;
        });
    }

    private void startProducer() {
        log.info("[{}] Starting producer", this.topicName);
        this.client.newProducer(Schema.AVRO(MetadataEvent.class)).topic(this.topicName).messageRoutingMode(MessageRoutingMode.SinglePartition).enableBatching(false).enableBatching(false).sendTimeout(0, TimeUnit.SECONDS).maxPendingMessages(1000).createAsync().thenAccept(producer -> {
            this.producer = producer;
            this.started = true;
            log.info("producer is created successfully {}", this.topicName);
        }).exceptionally(th -> {
            long next = this.backOff.next();
            log.warn("[{}] Failed to create producer ({}), retrying in {} s", new Object[]{this.topicName, th.getMessage(), Double.valueOf(next / 1000.0d)});
            this.brokerService.executor().schedule(this::startProducer, next, TimeUnit.MILLISECONDS);
            return null;
        });
    }

    private void startConsumer() {
        if (this.consumer != null) {
            return;
        }
        this.client.newConsumer(Schema.AVRO(MetadataEvent.class)).topic(this.topicName).subscriptionName(SUBSCRIPTION_NAME).ackTimeout(60L, TimeUnit.SECONDS).subscriptionType(SubscriptionType.Failover).messageListener((consumer, message) -> {
            log.info("Processing metadata event for {} with listeners {}", ((MetadataEvent) message.getValue()).getPath(), Integer.valueOf(this.listeners.size()));
            try {
                if (this.listeners.size() == 0) {
                    consumer.acknowledgeAsync((Message<?>) message);
                    return;
                }
                if (this.listeners.size() == 1) {
                    this.listeners.get(0).apply((MetadataEvent) message.getValue()).thenApply(r5 -> {
                        return consumer.acknowledgeAsync((Message<?>) message);
                    }).exceptionally((Function<Throwable, ? extends U>) th -> {
                        log.warn("Failed to synchronize {} for {}", new Object[]{message.getMessageId(), this.topicName, th.getCause()});
                        return null;
                    });
                } else {
                    FutureUtil.waitForAll((Collection<? extends CompletableFuture<?>>) this.listeners.stream().map(function -> {
                        return (CompletableFuture) function.apply((MetadataEvent) message.getValue());
                    }).collect(Collectors.toList())).thenApply(r52 -> {
                        return consumer.acknowledgeAsync((Message<?>) message);
                    }).exceptionally((Function<Throwable, ? extends U>) th2 -> {
                        log.warn("Failed to synchronize {} for {}", message.getMessageId(), this.topicName);
                        return null;
                    });
                }
            } catch (Exception e) {
                log.warn("Failed to synchronize {} for {}", message.getMessageId(), this.topicName);
            }
        }).subscribeAsync().thenAccept(consumer2 -> {
            log.info("successfully created consumer {}", this.topicName);
            this.consumer = consumer2;
        }).exceptionally(th -> {
            long next = this.backOff.next();
            log.warn("[{}] Failed to create consumer ({}), retrying in {} s", new Object[]{this.topicName, th.getMessage(), Double.valueOf(next / 1000.0d)});
            this.brokerService.executor().schedule(this::startConsumer, next, TimeUnit.MILLISECONDS);
            return null;
        });
    }

    public boolean isStarted() {
        return this.started;
    }

    @Override // org.apache.pulsar.metadata.api.MetadataEventSynchronizer
    public void close() {
        this.started = false;
        if (this.producer != null) {
            this.producer.closeAsync();
            this.producer = null;
        }
        if (this.consumer != null) {
            this.consumer.closeAsync();
            this.consumer = null;
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1680149338:
                if (implMethodName.equals("lambda$startConsumer$84bf2754$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/pulsar/client/api/MessageListener") && serializedLambda.getFunctionalInterfaceMethodName().equals("received") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V") && serializedLambda.getImplClass().equals("org/apache/pulsar/broker/service/PulsarMetadataEventSynchronizer") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/pulsar/client/api/Consumer;Lorg/apache/pulsar/client/api/Message;)V")) {
                    PulsarMetadataEventSynchronizer pulsarMetadataEventSynchronizer = (PulsarMetadataEventSynchronizer) serializedLambda.getCapturedArg(0);
                    return (consumer, message) -> {
                        log.info("Processing metadata event for {} with listeners {}", ((MetadataEvent) message.getValue()).getPath(), Integer.valueOf(this.listeners.size()));
                        try {
                            if (this.listeners.size() == 0) {
                                consumer.acknowledgeAsync((Message<?>) message);
                                return;
                            }
                            if (this.listeners.size() == 1) {
                                this.listeners.get(0).apply((MetadataEvent) message.getValue()).thenApply(r5 -> {
                                    return consumer.acknowledgeAsync((Message<?>) message);
                                }).exceptionally((Function<Throwable, ? extends U>) th -> {
                                    log.warn("Failed to synchronize {} for {}", new Object[]{message.getMessageId(), this.topicName, th.getCause()});
                                    return null;
                                });
                            } else {
                                FutureUtil.waitForAll((Collection<? extends CompletableFuture<?>>) this.listeners.stream().map(function -> {
                                    return (CompletableFuture) function.apply((MetadataEvent) message.getValue());
                                }).collect(Collectors.toList())).thenApply(r52 -> {
                                    return consumer.acknowledgeAsync((Message<?>) message);
                                }).exceptionally((Function<Throwable, ? extends U>) th2 -> {
                                    log.warn("Failed to synchronize {} for {}", message.getMessageId(), this.topicName);
                                    return null;
                                });
                            }
                        } catch (Exception e) {
                            log.warn("Failed to synchronize {} for {}", message.getMessageId(), this.topicName);
                        }
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
