package io.debezium.server.eventhubs;

import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventHubProducerClient;
import com.azure.messaging.eventhubs.models.CreateBatchOptions;
import io.debezium.DebeziumException;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import java.util.HashMap;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/server/eventhubs/BatchManager.class */
public class BatchManager {
    private final EventHubProducerClient producer;
    private final String configuredPartitionId;
    private final String configuredPartitionKey;
    private final Integer maxBatchSize;
    private final HashMap<Integer, CreateBatchOptions> batchOptions = new HashMap<>();
    private final HashMap<Integer, EventDataBatchProxy> batches = new HashMap<>();
    private List<ChangeEvent<Object, Object>> records;
    private DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> committer;
    private static final Logger LOGGER = LoggerFactory.getLogger(BatchManager.class);
    static final Integer BATCH_INDEX_FOR_NO_PARTITION_ID = -1;
    static final Integer BATCH_INDEX_FOR_PARTITION_KEY = 0;

    public BatchManager(EventHubProducerClient eventHubProducerClient, String str, String str2, Integer num) {
        this.producer = eventHubProducerClient;
        this.configuredPartitionId = str;
        this.configuredPartitionKey = str2;
        this.maxBatchSize = num;
    }

    public void initializeBatch(List<ChangeEvent<Object, Object>> list, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> recordCommitter) {
        this.records = list;
        this.committer = recordCommitter;
        if (this.configuredPartitionId.isEmpty() && this.configuredPartitionKey.isEmpty()) {
            CreateBatchOptions createBatchOptions = new CreateBatchOptions();
            if (this.maxBatchSize.intValue() != 0) {
                createBatchOptions.setMaximumSizeInBytes(this.maxBatchSize.intValue());
            }
            this.batchOptions.put(BATCH_INDEX_FOR_NO_PARTITION_ID, createBatchOptions);
            this.producer.getPartitionIds().stream().forEach(str -> {
                CreateBatchOptions partitionId = new CreateBatchOptions().setPartitionId(str);
                if (this.maxBatchSize.intValue() != 0) {
                    partitionId.setMaximumSizeInBytes(this.maxBatchSize.intValue());
                }
                this.batchOptions.put(Integer.valueOf(Integer.parseInt(str)), partitionId);
            });
            this.batchOptions.forEach((num, createBatchOptions2) -> {
                this.batches.put(num, new EventDataBatchProxy(this.producer, createBatchOptions2));
            });
            return;
        }
        CreateBatchOptions createBatchOptions3 = new CreateBatchOptions();
        if (!this.configuredPartitionId.isEmpty()) {
            createBatchOptions3.setPartitionId(this.configuredPartitionId);
            this.batchOptions.put(Integer.valueOf(Integer.parseInt(this.configuredPartitionId)), createBatchOptions3);
            this.batches.put(Integer.valueOf(Integer.parseInt(this.configuredPartitionId)), new EventDataBatchProxy(this.producer, createBatchOptions3));
        } else if (!this.configuredPartitionKey.isEmpty()) {
            createBatchOptions3.setPartitionKey(this.configuredPartitionKey);
            this.batchOptions.put(BATCH_INDEX_FOR_PARTITION_KEY, createBatchOptions3);
            this.batches.put(BATCH_INDEX_FOR_PARTITION_KEY, new EventDataBatchProxy(this.producer, createBatchOptions3));
        }
        if (this.maxBatchSize.intValue() != 0) {
            createBatchOptions3.setMaximumSizeInBytes(this.maxBatchSize.intValue());
        }
    }

    public void closeAndEmitBatches() {
        this.batches.forEach((num, eventDataBatchProxy) -> {
            if (eventDataBatchProxy.getCount() > 0) {
                LOGGER.trace("Dispatching {} events.", Integer.valueOf(eventDataBatchProxy.getCount()));
                emitBatchToEventHub(this.records, this.committer, eventDataBatchProxy);
            }
        });
    }

    public void sendEventToPartitionId(EventData eventData, Integer num, Integer num2) {
        EventDataBatchProxy eventDataBatchProxy = this.batches.get(num2);
        if (eventDataBatchProxy.tryAdd(eventData)) {
            return;
        }
        if (eventDataBatchProxy.getCount() == 0) {
            throw new DebeziumException("Event data is too large to fit into batch");
        }
        LOGGER.debug("Maximum batch size reached, dispatching {} events.", Integer.valueOf(eventDataBatchProxy.getCount()));
        emitBatchToEventHub(this.records, this.committer, eventDataBatchProxy);
        this.batches.put(num2, new EventDataBatchProxy(this.producer, this.batchOptions.get(num2)));
    }

    private void emitBatchToEventHub(List<ChangeEvent<Object, Object>> list, DebeziumEngine.RecordCommitter<ChangeEvent<Object, Object>> recordCommitter, EventDataBatchProxy eventDataBatchProxy) {
        int count = eventDataBatchProxy.getCount();
        if (count > 0) {
            try {
                LOGGER.trace("Sending batch of {} events to Event Hubs", Integer.valueOf(count));
                eventDataBatchProxy.emit();
                LOGGER.trace("Sent record batch to Event Hubs");
            } catch (Exception e) {
                throw new DebeziumException(e);
            }
        }
    }
}
