package org.apache.flink.streaming.connectors.pulsar;

import java.util.ArrayList;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSinkBase;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarClientUtils;
import org.apache.flink.streaming.connectors.pulsar.table.PulsarSinkSemantic;
import org.apache.flink.streaming.util.serialization.PulsarSerializationSchema;
import org.apache.flink.util.Preconditions;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageRouter;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/pulsar/FlinkPulsarSink.class */
public class FlinkPulsarSink<T> extends FlinkPulsarSinkBase<T> {
    private static final Logger log = LoggerFactory.getLogger(FlinkPulsarSink.class);
    private final PulsarSerializationSchema<T> serializationSchema;

    public FlinkPulsarSink(String str, Optional<String> optional, ClientConfigurationData clientConfigurationData, Properties properties, PulsarSerializationSchema pulsarSerializationSchema, MessageRouter messageRouter, PulsarSinkSemantic pulsarSinkSemantic) {
        super(str, optional, clientConfigurationData, properties, pulsarSerializationSchema, messageRouter, pulsarSinkSemantic);
        this.serializationSchema = pulsarSerializationSchema;
    }

    public FlinkPulsarSink(String str, Optional<String> optional, ClientConfigurationData clientConfigurationData, Properties properties, PulsarSerializationSchema pulsarSerializationSchema, PulsarSinkSemantic pulsarSinkSemantic) {
        this(str, optional, clientConfigurationData, properties, pulsarSerializationSchema, null, pulsarSinkSemantic);
    }

    public FlinkPulsarSink(String str, Optional<String> optional, ClientConfigurationData clientConfigurationData, Properties properties, PulsarSerializationSchema pulsarSerializationSchema) {
        this(str, optional, clientConfigurationData, properties, pulsarSerializationSchema, PulsarSinkSemantic.AT_LEAST_ONCE);
    }

    public FlinkPulsarSink(String str, String str2, Optional<String> optional, Properties properties, PulsarSerializationSchema<T> pulsarSerializationSchema) {
        this(str2, optional, PulsarClientUtils.newClientConf((String) Preconditions.checkNotNull(str), properties), properties, pulsarSerializationSchema, PulsarSinkSemantic.AT_LEAST_ONCE);
    }

    public void invoke(FlinkPulsarSinkBase.PulsarTransactionState<T> pulsarTransactionState, T t, SinkFunction.Context context) throws Exception {
        checkErroneous();
        initializeSendCallback();
        String orElse = this.serializationSchema.getTargetTopic(t).orElse(this.defaultTopic);
        TypedMessageBuilder<T> newMessage = pulsarTransactionState.isTransactional() ? getProducer(orElse).newMessage(pulsarTransactionState.getTransaction()) : getProducer(orElse).newMessage();
        this.serializationSchema.serialize(t, newMessage);
        if (this.flushOnCheckpoint) {
            synchronized (this.pendingRecordsLock) {
                this.pendingRecords++;
            }
        }
        CompletableFuture<MessageId> sendAsync = newMessage.sendAsync();
        if (pulsarTransactionState.isTransactional()) {
            Thread.sleep(10L);
            this.tid2FuturesMap.computeIfAbsent(pulsarTransactionState.transactionalId, txnID -> {
                return new ArrayList();
            }).add(sendAsync);
            log.debug("message {} is invoke in txn {}", t, pulsarTransactionState.transactionalId);
        }
        sendAsync.whenComplete(this.sendCallback);
    }

    @Override // org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSinkBase
    public /* bridge */ /* synthetic */ void producerFlush(FlinkPulsarSinkBase.PulsarTransactionState pulsarTransactionState) throws Exception {
        super.producerFlush(pulsarTransactionState);
    }

    @Override // org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSinkBase
    public /* bridge */ /* synthetic */ void close() throws Exception {
        super.close();
    }

    @Override // org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSinkBase
    public /* bridge */ /* synthetic */ void open(Configuration configuration) throws Exception {
        super.open(configuration);
    }

    @Override // org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSinkBase
    public /* bridge */ /* synthetic */ void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
        super.initializeState(functionInitializationContext);
    }

    @Override // org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSinkBase
    public /* bridge */ /* synthetic */ void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        super.snapshotState(functionSnapshotContext);
    }

    /* JADX WARN: Multi-variable type inference failed */
    public /* bridge */ /* synthetic */ void invoke(Object obj, Object obj2, SinkFunction.Context context) throws Exception {
        invoke((FlinkPulsarSinkBase.PulsarTransactionState<FlinkPulsarSinkBase.PulsarTransactionState<T>>) obj, (FlinkPulsarSinkBase.PulsarTransactionState<T>) obj2, context);
    }
}
