package org.apache.flink.streaming.connectors.pulsar;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.pulsar.internal.CachedPulsarClient;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarClientUtils;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarSerializationSchema;
import org.apache.flink.streaming.connectors.pulsar.internal.SchemaUtils;
import org.apache.flink.streaming.connectors.pulsar.internal.SourceSinkUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializableObject;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.protocol.Commands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSinkBase.class */
public abstract class FlinkPulsarSinkBase<T> extends RichSinkFunction<T> implements CheckpointedFunction {
    private static final Logger log = LoggerFactory.getLogger(FlinkPulsarSinkBase.class);
    protected String adminUrl;
    protected ClientConfigurationData clientConfigurationData;
    protected final Map<String, String> caseInsensitiveParams;
    protected final Map<String, Object> producerConf;
    protected final Properties properties;
    protected boolean flushOnCheckpoint;
    protected boolean failOnWrite;
    protected final SerializableObject pendingRecordsLock;
    protected long pendingRecords;
    protected final boolean forcedTopic;
    protected final String defaultTopic;
    protected final PulsarSerializationSchema<T> serializationSchema;
    protected volatile transient Throwable failedWrite;
    protected transient PulsarAdmin admin;
    protected transient BiConsumer<MessageId, Throwable> sendCallback;
    protected transient Producer<byte[]> singleProducer;
    protected transient Map<String, Producer<byte[]>> topic2Producer;

    public FlinkPulsarSinkBase(String str, Optional<String> optional, ClientConfigurationData clientConfigurationData, Properties properties, PulsarSerializationSchema<T> pulsarSerializationSchema) {
        this.pendingRecordsLock = new SerializableObject();
        this.pendingRecords = 0L;
        this.adminUrl = (String) Preconditions.checkNotNull(str);
        if (optional.isPresent()) {
            this.forcedTopic = true;
            this.defaultTopic = optional.get();
        } else {
            this.forcedTopic = false;
            this.defaultTopic = null;
        }
        this.serializationSchema = pulsarSerializationSchema;
        this.clientConfigurationData = clientConfigurationData;
        this.properties = (Properties) Preconditions.checkNotNull(properties);
        this.caseInsensitiveParams = SourceSinkUtils.toCaceInsensitiveParams(Maps.fromProperties(properties));
        this.producerConf = SourceSinkUtils.getProducerParams(Maps.fromProperties(properties));
        this.flushOnCheckpoint = SourceSinkUtils.flushOnCheckpoint(this.caseInsensitiveParams);
        this.failOnWrite = SourceSinkUtils.failOnWrite(this.caseInsensitiveParams);
        CachedPulsarClient.setCacheSize(SourceSinkUtils.getClientCacheSize(this.caseInsensitiveParams));
        if (this.clientConfigurationData.getServiceUrl() == null) {
            throw new IllegalArgumentException("ServiceUrl must be supplied in the client configuration");
        }
    }

    public FlinkPulsarSinkBase(String str, String str2, Optional<String> optional, Properties properties, PulsarSerializationSchema pulsarSerializationSchema) {
        this(str2, optional, PulsarClientUtils.newClientConf((String) Preconditions.checkNotNull(str), properties), properties, pulsarSerializationSchema);
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        checkErroneous();
        if (this.flushOnCheckpoint) {
            producerFlush();
            synchronized (this.pendingRecordsLock) {
                if (this.pendingRecords != 0) {
                    throw new IllegalStateException("Pending record count must be zero at this point " + this.pendingRecords);
                }
                checkErroneous();
            }
        }
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
    }

    public void open(Configuration configuration) throws Exception {
        if (this.flushOnCheckpoint && !getRuntimeContext().isCheckpointingEnabled()) {
            log.warn("Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing.");
            this.flushOnCheckpoint = false;
        }
        this.admin = PulsarClientUtils.newAdminFromConf(this.adminUrl, this.clientConfigurationData);
        this.serializationSchema.open(() -> {
            return getRuntimeContext().getMetricGroup().addGroup("user");
        });
        if (!this.forcedTopic) {
            this.topic2Producer = new HashMap();
        } else {
            uploadSchema(this.defaultTopic);
            this.singleProducer = createProducer(this.clientConfigurationData, this.producerConf, this.defaultTopic);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void initializeSendCallback() {
        if (this.sendCallback != null) {
            return;
        }
        if (this.failOnWrite) {
            this.sendCallback = (messageId, th) -> {
                if (this.failedWrite == null && th == null) {
                    acknowledgeMessage();
                } else if (this.failedWrite != null || th == null) {
                    log.warn("callback error {}", th);
                } else {
                    this.failedWrite = th;
                }
            };
        } else {
            this.sendCallback = (messageId2, th2) -> {
                if (this.failedWrite == null && th2 != null) {
                    log.error("Error while sending message to Pulsar: {}", ExceptionUtils.stringifyException(th2));
                }
                acknowledgeMessage();
            };
        }
    }

    private void uploadSchema(String str) {
        SchemaUtils.uploadPulsarSchema(this.admin, str, this.serializationSchema.getPulsarSchema().getSchemaInfo());
    }

    public void close() throws Exception {
        checkErroneous();
        producerClose();
        checkErroneous();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Producer<byte[]> getProducer(String str) {
        if (this.forcedTopic) {
            return this.singleProducer;
        }
        if (this.topic2Producer.containsKey(str)) {
            return this.topic2Producer.get(str);
        }
        uploadSchema(str);
        Producer<byte[]> createProducer = createProducer(this.clientConfigurationData, this.producerConf, str);
        this.topic2Producer.put(str, createProducer);
        return createProducer;
    }

    protected Producer<byte[]> createProducer(ClientConfigurationData clientConfigurationData, Map<String, Object> map, String str) {
        try {
            return CachedPulsarClient.getOrCreate(clientConfigurationData).newProducer(Schema.AUTO_PRODUCE_BYTES(this.serializationSchema.getPulsarSchema())).topic(str).batchingMaxPublishDelay(100L, TimeUnit.MILLISECONDS).batchingMaxBytes(Commands.DEFAULT_MAX_MESSAGE_SIZE).loadConf(map).create();
        } catch (PulsarClientException e) {
            log.error("Failed to create producer for topic {}", str);
            throw new RuntimeException(e);
        }
    }

    public void producerFlush() throws Exception {
        if (this.singleProducer != null) {
            this.singleProducer.flush();
        } else if (this.topic2Producer != null) {
            Iterator<Producer<byte[]>> it = this.topic2Producer.values().iterator();
            while (it.hasNext()) {
                it.next().flush();
            }
        }
        synchronized (this.pendingRecordsLock) {
            while (this.pendingRecords > 0) {
                try {
                    this.pendingRecordsLock.wait();
                } catch (InterruptedException e) {
                    throw new RuntimeException("Flushing got interrupted while checkpointing", e);
                }
            }
        }
    }

    protected void producerClose() throws Exception {
        producerFlush();
        if (this.admin != null) {
            this.admin.close();
        }
        if (this.singleProducer != null) {
            this.singleProducer.close();
        } else if (this.topic2Producer != null) {
            Iterator<Producer<byte[]>> it = this.topic2Producer.values().iterator();
            while (it.hasNext()) {
                it.next().close();
            }
            this.topic2Producer.clear();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void checkErroneous() throws Exception {
        Throwable th = this.failedWrite;
        if (th != null) {
            this.failedWrite = null;
            throw new Exception("Failed to send data to Pulsar: " + th.getMessage(), th);
        }
    }

    private void acknowledgeMessage() {
        if (this.flushOnCheckpoint) {
            synchronized (this.pendingRecordsLock) {
                this.pendingRecords--;
                if (this.pendingRecords == 0) {
                    this.pendingRecordsLock.notifyAll();
                }
            }
        }
    }
}
