package io.debezium.server.eventhubs;

import com.azure.core.amqp.exception.AmqpException;
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventHubProducerClient;
import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.server.BaseChangeConsumer;
import io.debezium.server.CustomConsumerBuilder;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.enterprise.context.Dependent;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import java.util.Iterator;
import java.util.List;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.config.ConfigProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Named("eventhubs")
@Dependent
/* loaded from: input_file:io/debezium/server/eventhubs/EventHubsChangeConsumer.class */
public class EventHubsChangeConsumer extends BaseChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<Object, Object>> {
    private static final Logger LOGGER = LoggerFactory.getLogger(EventHubsChangeConsumer.class);
    private static final String PROP_PREFIX = "debezium.sink.eventhubs.";
    private static final String PROP_CONNECTION_STRING_NAME = "debezium.sink.eventhubs.connectionstring";
    private static final String PROP_EVENTHUB_NAME = "debezium.sink.eventhubs.hubname";
    private static final String PROP_PARTITION_ID = "debezium.sink.eventhubs.partitionid";
    private static final String PROP_PARTITION_KEY = "debezium.sink.eventhubs.partitionkey";
    private static final String PROP_MAX_BATCH_SIZE = "debezium.sink.eventhubs.maxbatchsize";
    private String connectionString;
    private String eventHubName;
    private String configuredPartitionId;
    private String configuredPartitionKey;
    private Integer maxBatchSize;
    private Integer partitionCount;
    private static final String CONNECTION_STRING_FORMAT = "%s;EntityPath=%s";
    private EventHubProducerClient producer = null;
    private BatchManager batchManager = null;

    @Inject
    @CustomConsumerBuilder
    Instance<EventHubProducerClient> customProducer;

    @PostConstruct
    void connect() {
        if (this.customProducer.isResolvable()) {
            this.producer = (EventHubProducerClient) this.customProducer.get();
            LOGGER.info("Obtained custom configured Event Hubs client for namespace '{}'", ((EventHubProducerClient) this.customProducer.get()).getFullyQualifiedNamespace());
            return;
        }
        Config config = ConfigProvider.getConfig();
        this.connectionString = (String) config.getValue(PROP_CONNECTION_STRING_NAME, String.class);
        this.eventHubName = (String) config.getValue(PROP_EVENTHUB_NAME, String.class);
        this.maxBatchSize = (Integer) config.getOptionalValue(PROP_MAX_BATCH_SIZE, Integer.class).orElse(0);
        this.configuredPartitionId = (String) config.getOptionalValue(PROP_PARTITION_ID, String.class).orElse("");
        this.configuredPartitionKey = (String) config.getOptionalValue(PROP_PARTITION_KEY, String.class).orElse("");
        try {
            this.producer = new EventHubClientBuilder().connectionString(String.format(CONNECTION_STRING_FORMAT, this.connectionString, this.eventHubName)).buildProducerClient();
            this.batchManager = new BatchManager(this.producer, this.configuredPartitionId, this.configuredPartitionKey, this.maxBatchSize);
            LOGGER.info("Using default Event Hubs client for namespace '{}'", this.producer.getFullyQualifiedNamespace());
            this.partitionCount = Integer.valueOf((int) this.producer.getPartitionIds().stream().count());
            LOGGER.trace("Event Hub '{}' has {} partitions available", this.producer.getEventHubName(), this.partitionCount);
            if (!this.configuredPartitionId.isEmpty() && Integer.parseInt(this.configuredPartitionId) > this.partitionCount.intValue() - 1) {
                throw new IndexOutOfBoundsException(String.format("Target partition id %s does not exist in target EventHub %s", this.configuredPartitionId, this.eventHubName));
            }
        } catch (Exception e) {
            throw new DebeziumException(e);
        }
    }

    @PreDestroy
    void close() {
        try {
            this.producer.close();
            LOGGER.info("Closed Event Hubs producer client");
        } catch (Exception e) {
            LOGGER.warn("Exception while closing Event Hubs producer: {}", e);
        }
    }

    public void handleBatch(List<ChangeEvent<Object, Object>> list, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> recordCommitter) throws InterruptedException {
        EventData eventData;
        Integer partition;
        LOGGER.trace("Event Hubs sink adapter processing change events");
        this.batchManager.initializeBatch(list, recordCommitter);
        int i = 0;
        while (i < list.size()) {
            LOGGER.trace("Emitting events starting from index {}", Integer.valueOf(i));
            while (i < list.size()) {
                ChangeEvent<Object, Object> changeEvent = list.get(i);
                if (null != changeEvent.value()) {
                    if (changeEvent.value() instanceof String) {
                        eventData = new EventData((String) changeEvent.value());
                    } else if (changeEvent.value() instanceof byte[]) {
                        eventData = new EventData(getBytes(changeEvent.value()));
                    } else {
                        LOGGER.warn("Event data in record.value() is not of type String or byte[]");
                    }
                    if (!this.configuredPartitionId.isEmpty()) {
                        partition = Integer.valueOf(Integer.parseInt(this.configuredPartitionId));
                    } else if (this.configuredPartitionKey.isEmpty()) {
                        partition = changeEvent.partition();
                        if (partition == null) {
                            partition = BatchManager.BATCH_INDEX_FOR_NO_PARTITION_ID;
                        }
                    } else {
                        partition = BatchManager.BATCH_INDEX_FOR_PARTITION_KEY;
                    }
                    if (partition.intValue() < BatchManager.BATCH_INDEX_FOR_NO_PARTITION_ID.intValue() || partition.intValue() > this.partitionCount.intValue() - 1) {
                        throw new IndexOutOfBoundsException(String.format("Target partition id %d does not exist in target EventHub %s", partition, this.eventHubName));
                    }
                    try {
                        this.batchManager.sendEventToPartitionId(eventData, Integer.valueOf(i), partition);
                    } catch (IllegalArgumentException e) {
                        throw new DebeziumException(e);
                    } catch (Exception e2) {
                        throw new DebeziumException(e2);
                    } catch (AmqpException e3) {
                        throw new DebeziumException("Event data was larger than the maximum size of the batch", e3);
                    }
                }
                i++;
            }
        }
        this.batchManager.closeAndEmitBatches();
        LOGGER.trace("Marking {} records as processed.", Integer.valueOf(list.size()));
        Iterator<ChangeEvent<Object, Object>> it = list.iterator();
        while (it.hasNext()) {
            recordCommitter.markProcessed(it.next());
        }
        recordCommitter.markBatchFinished();
        LOGGER.trace("Batch marked finished");
    }
}
