package org.apache.flink.streaming.connectors.pulsar;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.RuntimeContextInitializationContextAdapters;
import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;
import org.apache.flink.streaming.connectors.pulsar.internal.CachedPulsarClient;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarClientUtils;
import org.apache.flink.streaming.connectors.pulsar.internal.SchemaUtils;
import org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils;
import org.apache.flink.streaming.connectors.pulsar.table.PulsarSinkSemantic;
import org.apache.flink.streaming.util.serialization.PulsarSerializationSchema;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializableObject;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.protocol.Commands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.class */
public abstract class FlinkPulsarSinkBase<T> extends TwoPhaseCommitSinkFunction<T, PulsarTransactionState<T>, Void> implements CheckpointedFunction {
    private static final Logger log = LoggerFactory.getLogger(FlinkPulsarSinkBase.class);
    protected String adminUrl;
    protected ClientConfigurationData clientConfigurationData;
    protected final Map<String, String> caseInsensitiveParams;
    protected final Map<String, Object> producerConf;
    protected final Properties properties;
    protected boolean flushOnCheckpoint;
    protected boolean failOnWrite;
    protected long transactionTimeout;
    protected long maxBlockTimeMs;
    protected int sendTimeOutMs;
    protected final SerializableObject pendingRecordsLock;
    protected long pendingRecords;
    protected ConcurrentHashMap<TxnID, List<MessageId>> tid2MessagesMap;
    protected ConcurrentHashMap<TxnID, List<CompletableFuture<MessageId>>> tid2FuturesMap;
    protected final boolean forcedTopic;
    protected final String defaultTopic;
    protected final PulsarSerializationSchema<T> serializationSchema;
    protected final MessageRouter messageRouter;
    protected volatile transient Throwable failedWrite;
    protected transient PulsarAdmin admin;
    protected transient BiConsumer<MessageId, Throwable> sendCallback;
    protected PulsarSinkSemantic semantic;
    protected transient Producer<T> singleProducer;
    protected transient Map<String, Producer<T>> topic2Producer;

    @VisibleForTesting
    @Internal
    /* loaded from: input_file:org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase$PulsarTransactionState.class */
    public static class PulsarTransactionState<T> {
        private final transient Transaction transaction;
        private final List<MessageId> pendingMessages;

        @Nullable
        final TxnID transactionalId;

        @VisibleForTesting
        public PulsarTransactionState() {
            this(null, null, new ArrayList());
        }

        @VisibleForTesting
        public PulsarTransactionState(@Nullable TxnID txnID, @Nullable Transaction transaction, List<MessageId> list) {
            this.transactionalId = txnID;
            this.transaction = transaction;
            this.pendingMessages = list;
        }

        public Transaction getTransaction() {
            return this.transaction;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public boolean isTransactional() {
            return this.transactionalId != null;
        }

        public List<MessageId> getPendingMessages() {
            return this.pendingMessages;
        }

        public String toString() {
            return isTransactional() ? String.format("%s [transactionalId=%s] [pendingMessages=%s]", getClass().getSimpleName(), this.transactionalId.toString(), Integer.valueOf(this.pendingMessages.size())) : String.format("%s this state is not in transactional mode", getClass().getSimpleName());
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            PulsarTransactionState pulsarTransactionState = (PulsarTransactionState) obj;
            if (this.pendingMessages.equals(pulsarTransactionState.pendingMessages)) {
                return this.transactionalId != null ? this.transactionalId.equals(pulsarTransactionState.transactionalId) : pulsarTransactionState.transactionalId == null;
            }
            return false;
        }

        public int hashCode() {
            return (31 * this.pendingMessages.hashCode()) + (this.transactionalId != null ? this.transactionalId.hashCode() : 0);
        }
    }

    @VisibleForTesting
    @Internal
    /* loaded from: input_file:org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase$TransactionStateSerializer.class */
    public static class TransactionStateSerializer<T> extends TypeSerializerSingleton<PulsarTransactionState<T>> {
        private static final long serialVersionUID = 1;

        /* loaded from: input_file:org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase$TransactionStateSerializer$TransactionStateSerializerSnapshot.class */
        public static final class TransactionStateSerializerSnapshot<T> extends SimpleTypeSerializerSnapshot<PulsarTransactionState<T>> {
            public TransactionStateSerializerSnapshot() {
                super(TransactionStateSerializer::new);
            }
        }

        public boolean isImmutableType() {
            return true;
        }

        /* renamed from: createInstance, reason: merged with bridge method [inline-methods] */
        public PulsarTransactionState<T> m912createInstance() {
            return null;
        }

        public PulsarTransactionState<T> copy(PulsarTransactionState<T> pulsarTransactionState) {
            return pulsarTransactionState;
        }

        public PulsarTransactionState<T> copy(PulsarTransactionState<T> pulsarTransactionState, PulsarTransactionState<T> pulsarTransactionState2) {
            return pulsarTransactionState;
        }

        public int getLength() {
            return -1;
        }

        public void serialize(PulsarTransactionState<T> pulsarTransactionState, DataOutputView dataOutputView) throws IOException {
            if (pulsarTransactionState.transactionalId == null) {
                dataOutputView.writeBoolean(false);
                return;
            }
            dataOutputView.writeBoolean(true);
            dataOutputView.writeLong(pulsarTransactionState.transactionalId.getMostSigBits());
            dataOutputView.writeLong(pulsarTransactionState.transactionalId.getLeastSigBits());
            dataOutputView.writeInt(((PulsarTransactionState) pulsarTransactionState).pendingMessages.size());
            Iterator it = ((PulsarTransactionState) pulsarTransactionState).pendingMessages.iterator();
            while (it.hasNext()) {
                byte[] byteArray = ((MessageId) it.next()).toByteArray();
                dataOutputView.writeInt(byteArray.length);
                dataOutputView.write(byteArray);
            }
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public PulsarTransactionState<T> m911deserialize(DataInputView dataInputView) throws IOException {
            TxnID txnID = null;
            ArrayList arrayList = new ArrayList();
            if (dataInputView.readBoolean()) {
                txnID = new TxnID(dataInputView.readLong(), dataInputView.readLong());
                int readInt = dataInputView.readInt();
                for (int i = 0; i < readInt; i++) {
                    byte[] bArr = new byte[dataInputView.readInt()];
                    dataInputView.read(bArr);
                    arrayList.add(MessageId.fromByteArray(bArr));
                }
            }
            return new PulsarTransactionState<>(txnID, null, arrayList);
        }

        public PulsarTransactionState<T> deserialize(PulsarTransactionState<T> pulsarTransactionState, DataInputView dataInputView) throws IOException {
            return m911deserialize(dataInputView);
        }

        public void copy(DataInputView dataInputView, DataOutputView dataOutputView) throws IOException {
            boolean readBoolean = dataInputView.readBoolean();
            dataOutputView.writeBoolean(readBoolean);
            if (readBoolean) {
                long readLong = dataInputView.readLong();
                long readLong2 = dataInputView.readLong();
                dataOutputView.writeLong(readLong);
                dataOutputView.writeLong(readLong2);
            }
        }

        public TypeSerializerSnapshot<PulsarTransactionState<T>> snapshotConfiguration() {
            return new TransactionStateSerializerSnapshot();
        }
    }

    public FlinkPulsarSinkBase(String str, Optional<String> optional, ClientConfigurationData clientConfigurationData, Properties properties, PulsarSerializationSchema<T> pulsarSerializationSchema, MessageRouter messageRouter) {
        this(str, optional, clientConfigurationData, properties, pulsarSerializationSchema, messageRouter, PulsarSinkSemantic.AT_LEAST_ONCE);
    }

    public FlinkPulsarSinkBase(String str, Optional<String> optional, ClientConfigurationData clientConfigurationData, Properties properties, PulsarSerializationSchema<T> pulsarSerializationSchema, MessageRouter messageRouter, PulsarSinkSemantic pulsarSinkSemantic) {
        super(new TransactionStateSerializer(), VoidSerializer.INSTANCE);
        this.pendingRecordsLock = new SerializableObject();
        this.pendingRecords = 0L;
        this.adminUrl = (String) Preconditions.checkNotNull(str);
        this.semantic = pulsarSinkSemantic;
        if (optional.isPresent()) {
            this.forcedTopic = true;
            this.defaultTopic = optional.get();
        } else {
            this.forcedTopic = false;
            this.defaultTopic = null;
        }
        this.serializationSchema = pulsarSerializationSchema;
        this.messageRouter = messageRouter;
        this.clientConfigurationData = clientConfigurationData;
        this.properties = (Properties) Preconditions.checkNotNull(properties);
        this.caseInsensitiveParams = SourceSinkUtils.toCaceInsensitiveParams(Maps.fromProperties(properties));
        this.producerConf = SourceSinkUtils.getProducerParams(Maps.fromProperties(properties));
        this.flushOnCheckpoint = SourceSinkUtils.flushOnCheckpoint(this.caseInsensitiveParams);
        this.failOnWrite = SourceSinkUtils.failOnWrite(this.caseInsensitiveParams);
        this.transactionTimeout = SourceSinkUtils.getTransactionTimeout(this.caseInsensitiveParams);
        this.maxBlockTimeMs = SourceSinkUtils.getMaxBlockTimeMs(this.caseInsensitiveParams);
        this.sendTimeOutMs = SourceSinkUtils.getSendTimeoutMs(this.caseInsensitiveParams);
        CachedPulsarClient.setCacheSize(SourceSinkUtils.getClientCacheSize(this.caseInsensitiveParams));
        if (pulsarSinkSemantic == PulsarSinkSemantic.EXACTLY_ONCE) {
            this.sendTimeOutMs = 0;
            this.tid2MessagesMap = new ConcurrentHashMap<>();
            this.tid2FuturesMap = new ConcurrentHashMap<>();
            this.clientConfigurationData.setEnableTransaction(true);
        }
        if (this.clientConfigurationData.getServiceUrl() == null) {
            throw new IllegalArgumentException("ServiceUrl must be supplied in the client configuration");
        }
    }

    public FlinkPulsarSinkBase(String str, String str2, Optional<String> optional, Properties properties, PulsarSerializationSchema pulsarSerializationSchema, MessageRouter messageRouter) {
        this(str2, optional, PulsarClientUtils.newClientConf((String) Preconditions.checkNotNull(str), properties), properties, pulsarSerializationSchema, messageRouter);
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        checkErroneous();
        super.snapshotState(functionSnapshotContext);
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        if (this.semantic != PulsarSinkSemantic.NONE && !getRuntimeContext().isCheckpointingEnabled()) {
            log.warn("Using {} semantic, but checkpointing is not enabled. Switching to {} semantic.", this.semantic, PulsarSinkSemantic.NONE);
            this.semantic = PulsarSinkSemantic.NONE;
        }
        super.initializeState(functionInitializationContext);
    }

    public void open(Configuration configuration) throws Exception {
        if (this.flushOnCheckpoint && !getRuntimeContext().isCheckpointingEnabled()) {
            log.warn("Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing.");
            this.flushOnCheckpoint = false;
        }
        this.admin = PulsarClientUtils.newAdminFromConf(this.adminUrl, this.clientConfigurationData);
        this.serializationSchema.open(RuntimeContextInitializationContextAdapters.serializationAdapter(getRuntimeContext(), metricGroup -> {
            return metricGroup.addGroup("user");
        }));
        if (!this.forcedTopic) {
            this.topic2Producer = new HashMap();
        } else {
            uploadSchema(this.defaultTopic);
            this.singleProducer = createProducer(this.clientConfigurationData, this.producerConf, this.defaultTopic, this.serializationSchema.getSchema());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void initializeSendCallback() {
        if (this.sendCallback != null) {
            return;
        }
        if (this.failOnWrite) {
            this.sendCallback = (messageId, th) -> {
                if (this.failedWrite == null && th == null) {
                    acknowledgeMessage();
                } else if (this.failedWrite != null || th == null) {
                    log.warn("callback error {}", th);
                } else {
                    this.failedWrite = th;
                }
            };
        } else {
            this.sendCallback = (messageId2, th2) -> {
                if (this.failedWrite == null && th2 != null) {
                    log.error("Error while sending message to Pulsar: {}", ExceptionUtils.stringifyException(th2));
                }
                acknowledgeMessage();
            };
        }
    }

    private void uploadSchema(String str) {
        SchemaUtils.uploadPulsarSchema(this.admin, str, this.serializationSchema.getSchema().getSchemaInfo());
    }

    public void close() throws Exception {
        checkErroneous();
        producerClose();
        checkErroneous();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Producer<T> getProducer(String str) {
        if (this.forcedTopic) {
            return this.singleProducer;
        }
        if (this.topic2Producer.containsKey(str)) {
            return this.topic2Producer.get(str);
        }
        uploadSchema(str);
        Producer<T> createProducer = createProducer(this.clientConfigurationData, this.producerConf, str, this.serializationSchema.getSchema());
        this.topic2Producer.put(str, createProducer);
        return createProducer;
    }

    protected Producer<T> createProducer(ClientConfigurationData clientConfigurationData, Map<String, Object> map, String str, Schema<T> schema) {
        try {
            ProducerBuilder<T> loadConf = CachedPulsarClient.getOrCreate(clientConfigurationData).newProducer(schema).topic(str).sendTimeout(this.sendTimeOutMs, TimeUnit.MILLISECONDS).batchingMaxPublishDelay(100L, TimeUnit.MILLISECONDS).batchingMaxBytes(Commands.DEFAULT_MAX_MESSAGE_SIZE).loadConf(map);
            return this.messageRouter == null ? loadConf.create() : loadConf.messageRoutingMode(MessageRoutingMode.CustomPartition).messageRouter(this.messageRouter).create();
        } catch (PulsarClientException e) {
            log.error("Failed to create producer for topic {}", str);
            throw new RuntimeException(e);
        }
    }

    public void producerFlush(PulsarTransactionState<T> pulsarTransactionState) throws Exception {
        if (this.singleProducer != null) {
            this.singleProducer.flush();
        } else if (this.topic2Producer != null) {
            Iterator<Producer<T>> it = this.topic2Producer.values().iterator();
            while (it.hasNext()) {
                it.next().flush();
            }
        }
        if (pulsarTransactionState.isTransactional()) {
            Iterator<CompletableFuture<MessageId>> it2 = this.tid2FuturesMap.get(pulsarTransactionState.transactionalId).iterator();
            while (it2.hasNext()) {
                try {
                    MessageId messageId = it2.next().get();
                    TxnID txnID = pulsarTransactionState.transactionalId;
                    this.tid2MessagesMap.computeIfAbsent(txnID, txnID2 -> {
                        return new ArrayList();
                    }).add(messageId);
                    log.debug("transaction {} add the message {} to messageIdLIst", txnID, messageId);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } catch (ExecutionException e2) {
                    throw new RuntimeException(e2);
                }
            }
        }
        synchronized (this.pendingRecordsLock) {
            while (this.pendingRecords > 0) {
                try {
                    this.pendingRecordsLock.wait();
                } catch (InterruptedException e3) {
                    throw new RuntimeException("Flushing got interrupted while checkpointing", e3);
                }
            }
        }
        checkErroneous();
    }

    private Transaction createTransaction() throws Exception {
        PulsarClientImpl orCreate = CachedPulsarClient.getOrCreate(this.clientConfigurationData);
        Thread.sleep(100L);
        return orCreate.newTransaction().withTransactionTimeout(this.transactionTimeout, TimeUnit.MILLISECONDS).build().get();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: beginTransaction, reason: merged with bridge method [inline-methods] */
    public PulsarTransactionState<T> m909beginTransaction() throws Exception {
        switch (this.semantic) {
            case EXACTLY_ONCE:
                log.debug("transaction is begining in EXACTLY_ONCE mode");
                Transaction createTransaction = createTransaction();
                long txnIdLeastBits = ((TransactionImpl) createTransaction).getTxnIdLeastBits();
                long txnIdMostBits = ((TransactionImpl) createTransaction).getTxnIdMostBits();
                TxnID txnID = new TxnID(txnIdMostBits, txnIdLeastBits);
                this.tid2MessagesMap.computeIfAbsent(txnID, txnID2 -> {
                    return new ArrayList();
                });
                this.tid2FuturesMap.computeIfAbsent(txnID, txnID3 -> {
                    return new ArrayList();
                });
                return new PulsarTransactionState<>(new TxnID(txnIdMostBits, txnIdLeastBits), createTransaction, this.tid2MessagesMap.get(txnID));
            case AT_LEAST_ONCE:
            case NONE:
                PulsarTransactionState pulsarTransactionState = (PulsarTransactionState) currentTransaction();
                return (pulsarTransactionState == null || pulsarTransactionState.transactionalId == null) ? new PulsarTransactionState<>(null, null, new ArrayList()) : new PulsarTransactionState<>(pulsarTransactionState.transactionalId, pulsarTransactionState.getTransaction(), pulsarTransactionState.getPendingMessages());
            default:
                throw new UnsupportedOperationException("Not implemented semantic");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void preCommit(PulsarTransactionState<T> pulsarTransactionState) throws Exception {
        switch (this.semantic) {
            case EXACTLY_ONCE:
            case AT_LEAST_ONCE:
                producerFlush(pulsarTransactionState);
                break;
            case NONE:
                break;
            default:
                throw new UnsupportedOperationException("Not implemented semantic");
        }
        if (pulsarTransactionState.isTransactional()) {
            log.debug("{} preCommit with pending message size {}", pulsarTransactionState.transactionalId, Integer.valueOf(this.tid2MessagesMap.get(((PulsarTransactionState) currentTransaction()).transactionalId).size()));
        } else {
            log.debug("in AT_LEAST_ONCE mode, producer was flushed by preCommit");
        }
        checkErroneous();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void commit(PulsarTransactionState<T> pulsarTransactionState) {
        if (pulsarTransactionState.isTransactional()) {
            log.debug("transaction {} is committing", pulsarTransactionState.transactionalId.toString());
            try {
                ((PulsarTransactionState) pulsarTransactionState).transaction.commit().get(this.maxBlockTimeMs, TimeUnit.MILLISECONDS);
                log.debug("transaction {} is committed with messageID size {}", pulsarTransactionState.transactionalId.toString(), Integer.valueOf(this.tid2MessagesMap.get(pulsarTransactionState.transactionalId).size()));
                this.tid2MessagesMap.remove(pulsarTransactionState.transactionalId);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void abort(PulsarTransactionState<T> pulsarTransactionState) {
        if (pulsarTransactionState.isTransactional()) {
            CompletableFuture<Void> abort = ((PulsarTransactionState) pulsarTransactionState).transaction.abort();
            log.debug("transaction {} is aborting", pulsarTransactionState.transactionalId.toString());
            try {
                abort.get(this.maxBlockTimeMs, TimeUnit.MILLISECONDS);
                log.debug("transaction {} is aborted", pulsarTransactionState.transactionalId.toString());
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void recoverAndCommit(PulsarTransactionState<T> pulsarTransactionState) {
        if (pulsarTransactionState.isTransactional()) {
            try {
                log.debug("transaction {} is recoverAndCommit...", pulsarTransactionState.transactionalId);
                CachedPulsarClient.getOrCreate(this.clientConfigurationData).getTcClient().commit(pulsarTransactionState.transactionalId);
            } catch (PulsarClientException e) {
                log.error("Failed to getOrCreate a PulsarClient");
                throw new RuntimeException(e);
            } catch (TransactionCoordinatorClientException.InvalidTxnStatusException e2) {
                log.debug("transaction {} is already committed...", pulsarTransactionState.transactionalId);
            } catch (TransactionCoordinatorClientException e3) {
                throw new RuntimeException(e3);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void recoverAndAbort(PulsarTransactionState<T> pulsarTransactionState) {
        if (pulsarTransactionState.isTransactional()) {
            try {
                log.debug("transaction {} is recoverAndAbort...", pulsarTransactionState.transactionalId);
                CachedPulsarClient.getOrCreate(this.clientConfigurationData).getTcClient().abort(pulsarTransactionState.transactionalId);
            } catch (PulsarClientException e) {
                log.error("Failed to getOrCreate a PulsarClient");
                throw new RuntimeException(e);
            } catch (TransactionCoordinatorClientException.InvalidTxnStatusException e2) {
                log.debug("transaction {} is already aborted...", pulsarTransactionState.transactionalId);
            } catch (TransactionCoordinatorClientException e3) {
                throw new RuntimeException(e3);
            }
        }
    }

    protected void producerClose() throws Exception {
        producerFlush((PulsarTransactionState) currentTransaction());
        if (this.admin != null) {
            this.admin.close();
        }
        if (this.singleProducer != null) {
            this.singleProducer.close();
        } else if (this.topic2Producer != null) {
            Iterator<Producer<T>> it = this.topic2Producer.values().iterator();
            while (it.hasNext()) {
                it.next().close();
            }
            this.topic2Producer.clear();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void checkErroneous() throws Exception {
        Throwable th = this.failedWrite;
        if (th != null) {
            this.failedWrite = null;
            throw new Exception("Failed to send data to Pulsar: " + th.getMessage(), th);
        }
    }

    private void acknowledgeMessage() {
        if (this.flushOnCheckpoint) {
            synchronized (this.pendingRecordsLock) {
                this.pendingRecords--;
                if (this.pendingRecords == 0) {
                    this.pendingRecordsLock.notifyAll();
                }
            }
        }
    }
}
