/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TopicMetadata;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.HandlerState;
import org.apache.pulsar.client.impl.PartitionsChangedListener;
import org.apache.pulsar.client.impl.ProducerBase;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.ProducerInterceptors;
import org.apache.pulsar.client.impl.ProducerStatsRecorderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.RoundRobinPartitionMessageRouterImpl;
import org.apache.pulsar.client.impl.SinglePartitionMessageRouterImpl;
import org.apache.pulsar.client.impl.TopicMetadataImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PartitionedProducerImpl<T>
extends ProducerBase<T> {
    private static final Logger log = LoggerFactory.getLogger(PartitionedProducerImpl.class);
    private List<ProducerImpl<T>> producers;
    private MessageRouter routerPolicy;
    private final ProducerStatsRecorderImpl stats;
    private TopicMetadata topicMetadata;
    private volatile Timeout partitionsAutoUpdateTimeout = null;
    TopicsPartitionChangedListener topicsPartitionChangedListener;
    CompletableFuture<Void> partitionsAutoUpdateFuture = null;
    private TimerTask partitionsAutoUpdateTimerTask = new TimerTask(){

        public void run(Timeout timeout) throws Exception {
            if (timeout.isCancelled() || PartitionedProducerImpl.this.getState() != HandlerState.State.Ready) {
                return;
            }
            if (log.isDebugEnabled()) {
                log.debug("[{}] run partitionsAutoUpdateTimerTask for partitioned producer", (Object)PartitionedProducerImpl.this.topic);
            }
            if (PartitionedProducerImpl.this.partitionsAutoUpdateFuture == null || PartitionedProducerImpl.this.partitionsAutoUpdateFuture.isDone()) {
                PartitionedProducerImpl.this.partitionsAutoUpdateFuture = PartitionedProducerImpl.this.topicsPartitionChangedListener.onTopicsExtended((Collection<String>)ImmutableList.of((Object)PartitionedProducerImpl.this.topic));
            }
            PartitionedProducerImpl.this.partitionsAutoUpdateTimeout = PartitionedProducerImpl.this.client.timer().newTimeout(PartitionedProducerImpl.this.partitionsAutoUpdateTimerTask, PartitionedProducerImpl.this.conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS);
        }
    };

    public PartitionedProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf, int numPartitions, CompletableFuture<Producer<T>> producerCreatedFuture, Schema<T> schema, ProducerInterceptors interceptors) {
        super(client, topic, conf, producerCreatedFuture, schema, interceptors);
        this.producers = Lists.newArrayListWithCapacity((int)numPartitions);
        this.topicMetadata = new TopicMetadataImpl(numPartitions);
        this.routerPolicy = this.getMessageRouter();
        this.stats = client.getConfiguration().getStatsIntervalSeconds() > 0L ? new ProducerStatsRecorderImpl() : null;
        int maxPendingMessages = Math.min(conf.getMaxPendingMessages(), conf.getMaxPendingMessagesAcrossPartitions() / numPartitions);
        conf.setMaxPendingMessages(maxPendingMessages);
        this.start();
        if (conf.isAutoUpdatePartitions()) {
            this.topicsPartitionChangedListener = new TopicsPartitionChangedListener();
            this.partitionsAutoUpdateTimeout = client.timer().newTimeout(this.partitionsAutoUpdateTimerTask, conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS);
        }
    }

    private MessageRouter getMessageRouter() {
        MessageRouter messageRouter;
        MessageRoutingMode messageRouteMode = this.conf.getMessageRoutingMode();
        switch (messageRouteMode) {
            case CustomPartition: {
                messageRouter = (MessageRouter)Preconditions.checkNotNull((Object)this.conf.getCustomMessageRouter());
                break;
            }
            case SinglePartition: {
                messageRouter = new SinglePartitionMessageRouterImpl(ThreadLocalRandom.current().nextInt(this.topicMetadata.numPartitions()), this.conf.getHashingScheme());
                break;
            }
            default: {
                messageRouter = new RoundRobinPartitionMessageRouterImpl(this.conf.getHashingScheme(), ThreadLocalRandom.current().nextInt(this.topicMetadata.numPartitions()), this.conf.isBatchingEnabled(), TimeUnit.MICROSECONDS.toMillis(this.conf.batchingPartitionSwitchFrequencyIntervalMicros()));
            }
        }
        return messageRouter;
    }

    public String getProducerName() {
        return this.producers.get(0).getProducerName();
    }

    public long getLastSequenceId() {
        return this.producers.stream().map(Producer::getLastSequenceId).mapToLong(Long::longValue).max().orElse(-1L);
    }

    private void start() {
        AtomicReference createFail = new AtomicReference();
        AtomicInteger completed = new AtomicInteger();
        for (int partitionIndex = 0; partitionIndex < this.topicMetadata.numPartitions(); ++partitionIndex) {
            String partitionName = TopicName.get((String)this.topic).getPartition(partitionIndex).toString();
            ProducerImpl producer = this.client.newProducerImpl(partitionName, partitionIndex, this.conf, this.schema, this.interceptors, new CompletableFuture());
            this.producers.add(producer);
            producer.producerCreatedFuture().handle((prod, createException) -> {
                if (createException != null) {
                    this.setState(HandlerState.State.Failed);
                    createFail.compareAndSet(null, createException);
                }
                if (completed.incrementAndGet() == this.topicMetadata.numPartitions()) {
                    if (createFail.get() == null) {
                        this.setState(HandlerState.State.Ready);
                        log.info("[{}] Created partitioned producer", (Object)this.topic);
                        this.producerCreatedFuture().complete(this);
                    } else {
                        log.error("[{}] Could not create partitioned producer.", (Object)this.topic, (Object)((Throwable)createFail.get()).getCause());
                        this.closeAsync().handle((ok, closeException) -> {
                            this.producerCreatedFuture().completeExceptionally((Throwable)createFail.get());
                            this.client.cleanupProducer(this);
                            return null;
                        });
                    }
                }
                return null;
            });
        }
    }

    @Override
    CompletableFuture<MessageId> internalSendAsync(Message<?> message) {
        return this.internalSendWithTxnAsync(message, null);
    }

    @Override
    CompletableFuture<MessageId> internalSendWithTxnAsync(Message<?> message, Transaction txn) {
        switch (this.getState()) {
            case Ready: 
            case Connecting: {
                break;
            }
            case Closing: 
            case Closed: {
                return FutureUtil.failedFuture((Throwable)new PulsarClientException.AlreadyClosedException("Producer already closed"));
            }
            case ProducerFenced: {
                return FutureUtil.failedFuture((Throwable)new PulsarClientException.ProducerFencedException("Producer was fenced"));
            }
            case Terminated: {
                return FutureUtil.failedFuture((Throwable)new PulsarClientException.TopicTerminatedException("Topic was terminated"));
            }
            case Failed: 
            case Uninitialized: {
                return FutureUtil.failedFuture((Throwable)new PulsarClientException.NotConnectedException());
            }
        }
        int partition = this.routerPolicy.choosePartition(message, this.topicMetadata);
        Preconditions.checkArgument((partition >= 0 && partition < this.topicMetadata.numPartitions() ? 1 : 0) != 0, (Object)("Illegal partition index chosen by the message routing policy: " + partition));
        return this.producers.get(partition).internalSendWithTxnAsync(message, txn);
    }

    public CompletableFuture<Void> flushAsync() {
        List<CompletableFuture> flushFutures = this.producers.stream().map(ProducerImpl::flushAsync).collect(Collectors.toList());
        return CompletableFuture.allOf(flushFutures.toArray(new CompletableFuture[flushFutures.size()]));
    }

    @Override
    void triggerFlush() {
        this.producers.forEach(ProducerImpl::triggerFlush);
    }

    public boolean isConnected() {
        return this.producers.stream().allMatch(ProducerImpl::isConnected);
    }

    public long getLastDisconnectedTimestamp() {
        long lastDisconnectedTimestamp = 0L;
        Optional<ProducerImpl> p = this.producers.stream().max(Comparator.comparingLong(ProducerImpl::getLastDisconnectedTimestamp));
        if (p.isPresent()) {
            lastDisconnectedTimestamp = p.get().getLastDisconnectedTimestamp();
        }
        return lastDisconnectedTimestamp;
    }

    @Override
    public CompletableFuture<Void> closeAsync() {
        if (this.getState() == HandlerState.State.Closing || this.getState() == HandlerState.State.Closed) {
            return CompletableFuture.completedFuture(null);
        }
        this.setState(HandlerState.State.Closing);
        if (this.partitionsAutoUpdateTimeout != null) {
            this.partitionsAutoUpdateTimeout.cancel();
            this.partitionsAutoUpdateTimeout = null;
        }
        AtomicReference closeFail = new AtomicReference();
        AtomicInteger completed = new AtomicInteger(this.topicMetadata.numPartitions());
        CompletableFuture<Void> closeFuture = new CompletableFuture<Void>();
        for (Producer producer : this.producers) {
            if (producer == null) continue;
            producer.closeAsync().handle((closed, ex) -> {
                if (ex != null) {
                    closeFail.compareAndSet(null, ex);
                }
                if (completed.decrementAndGet() == 0) {
                    if (closeFail.get() == null) {
                        this.setState(HandlerState.State.Closed);
                        closeFuture.complete(null);
                        log.info("[{}] Closed Partitioned Producer", (Object)this.topic);
                        this.client.cleanupProducer(this);
                    } else {
                        this.setState(HandlerState.State.Failed);
                        closeFuture.completeExceptionally((Throwable)closeFail.get());
                        log.error("[{}] Could not close Partitioned Producer", (Object)this.topic, (Object)((Throwable)closeFail.get()).getCause());
                    }
                }
                return null;
            });
        }
        return closeFuture;
    }

    public synchronized ProducerStatsRecorderImpl getStats() {
        if (this.stats == null) {
            return null;
        }
        this.stats.reset();
        for (int i = 0; i < this.topicMetadata.numPartitions(); ++i) {
            this.stats.updateCumulativeStats(this.producers.get(i).getStats());
        }
        return this.stats;
    }

    public List<ProducerImpl<T>> getProducers() {
        return this.producers.stream().collect(Collectors.toList());
    }

    @Override
    String getHandlerName() {
        return "partition-producer";
    }

    @VisibleForTesting
    public Timeout getPartitionsAutoUpdateTimeout() {
        return this.partitionsAutoUpdateTimeout;
    }

    private class TopicsPartitionChangedListener
    implements PartitionsChangedListener {
        private TopicsPartitionChangedListener() {
        }

        @Override
        public CompletableFuture<Void> onTopicsExtended(Collection<String> topicsExtended) {
            CompletableFuture<Void> future = new CompletableFuture<Void>();
            if (topicsExtended.isEmpty() || !topicsExtended.contains(PartitionedProducerImpl.this.topic)) {
                future.complete(null);
                return future;
            }
            PartitionedProducerImpl.this.client.getPartitionsForTopic(PartitionedProducerImpl.this.topic).thenCompose(list -> {
                int oldPartitionNumber = PartitionedProducerImpl.this.topicMetadata.numPartitions();
                int currentPartitionNumber = list.size();
                if (log.isDebugEnabled()) {
                    log.debug("[{}] partitions number. old: {}, new: {}", new Object[]{PartitionedProducerImpl.this.topic, oldPartitionNumber, currentPartitionNumber});
                }
                if (oldPartitionNumber == currentPartitionNumber) {
                    future.complete(null);
                    return future;
                }
                if (oldPartitionNumber < currentPartitionNumber) {
                    List futureList = list.subList(oldPartitionNumber, currentPartitionNumber).stream().map(partitionName -> {
                        int partitionIndex = TopicName.getPartitionIndex((String)partitionName);
                        ProducerImpl producer = new ProducerImpl(PartitionedProducerImpl.this.client, (String)partitionName, PartitionedProducerImpl.this.conf, new CompletableFuture(), partitionIndex, PartitionedProducerImpl.this.schema, PartitionedProducerImpl.this.interceptors);
                        PartitionedProducerImpl.this.producers.add(producer);
                        return producer.producerCreatedFuture();
                    }).collect(Collectors.toList());
                    ((CompletableFuture)FutureUtil.waitForAll(futureList).thenAccept(finalFuture -> {
                        if (log.isDebugEnabled()) {
                            log.debug("[{}] success create producers for extended partitions. old: {}, new: {}", new Object[]{PartitionedProducerImpl.this.topic, oldPartitionNumber, currentPartitionNumber});
                        }
                        PartitionedProducerImpl.this.topicMetadata = new TopicMetadataImpl(currentPartitionNumber);
                        future.complete(null);
                    })).exceptionally(ex -> {
                        log.warn("[{}] fail create producers for extended partitions. old: {}, new: {}", new Object[]{PartitionedProducerImpl.this.topic, oldPartitionNumber, currentPartitionNumber});
                        List sublist = PartitionedProducerImpl.this.producers.subList(oldPartitionNumber, PartitionedProducerImpl.this.producers.size());
                        sublist.forEach(newProducer -> newProducer.closeAsync());
                        sublist.clear();
                        future.completeExceptionally((Throwable)ex);
                        return null;
                    });
                    return null;
                }
                log.error("[{}] not support shrink topic partitions. old: {}, new: {}", new Object[]{PartitionedProducerImpl.this.topic, oldPartitionNumber, currentPartitionNumber});
                future.completeExceptionally((Throwable)new PulsarClientException.NotSupportedException("not support shrink topic partitions"));
                return future;
            });
            return future;
        }
    }
}

