package org.apache.samza.system.eventhub.producer;

import com.google.common.annotations.VisibleForTesting;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
import com.microsoft.azure.eventhubs.EventHubRuntimeInformation;
import com.microsoft.azure.eventhubs.PartitionSender;
import com.microsoft.azure.eventhubs.impl.EventDataImpl;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.lang3.Validate;
import org.apache.samza.SamzaException;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.eventhub.EventHubClientManager;
import org.apache.samza.system.eventhub.EventHubClientManagerFactory;
import org.apache.samza.system.eventhub.EventHubConfig;
import org.apache.samza.system.eventhub.Interceptor;
import org.apache.samza.system.eventhub.consumer.EventHubSystemConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/system/eventhub/producer/EventHubSystemProducer.class */
public class EventHubSystemProducer extends AsyncSystemProducer {
    public static final String PRODUCE_TIMESTAMP = "produce-timestamp";
    public static final String KEY = "key";
    private static final String EVENT_SKIP_RATE = "eventSkipRate";
    private static final String EVENT_WRITE_RATE = "eventWriteRate";
    private static final String EVENT_BYTE_WRITE_RATE = "eventByteWriteRate";
    private final HashMap<String, Counter> eventSkipRate;
    private final HashMap<String, Counter> eventWriteRate;
    private final HashMap<String, Counter> eventByteWriteRate;
    private final EventHubConfig config;
    private final PartitioningMethod partitioningMethod;
    private final String systemName;
    private final int maxMessageSize;
    private volatile boolean isStarted;
    private boolean isInitialized;

    @VisibleForTesting
    final Map<String, Map<Integer, EventHubClientManager>> perPartitionEventHubClients;
    private final Map<String, EventHubClientManager> perStreamEventHubClientManagers;
    private final Map<String, Map<Integer, PartitionSender>> streamPartitionSenders;
    private final Map<String, Interceptor> interceptors;
    private final EventHubClientManagerFactory eventHubClientManagerFactory;
    private static final Logger LOG = LoggerFactory.getLogger(EventHubSystemProducer.class.getName());
    private static final long DEFAULT_SHUTDOWN_TIMEOUT_MILLIS = Duration.ofMinutes(1).toMillis();
    private static final Object AGGREGATE_METRICS_LOCK = new Object();
    private static Counter aggEventSkipRate = null;
    private static Counter aggEventWriteRate = null;
    private static Counter aggEventByteWriteRate = null;

    /* loaded from: input_file:org/apache/samza/system/eventhub/producer/EventHubSystemProducer$PartitioningMethod.class */
    public enum PartitioningMethod {
        ROUND_ROBIN,
        EVENT_HUB_HASHING,
        PARTITION_KEY_AS_PARTITION
    }

    public EventHubSystemProducer(EventHubConfig eventHubConfig, String str, EventHubClientManagerFactory eventHubClientManagerFactory, Map<String, Interceptor> map, MetricsRegistry metricsRegistry) {
        super(str, eventHubConfig, metricsRegistry);
        this.eventSkipRate = new HashMap<>();
        this.eventWriteRate = new HashMap<>();
        this.eventByteWriteRate = new HashMap<>();
        this.isStarted = false;
        this.isInitialized = false;
        this.perPartitionEventHubClients = new HashMap();
        this.perStreamEventHubClientManagers = new HashMap();
        this.streamPartitionSenders = new HashMap();
        LOG.info("Creating EventHub Producer for system {}", str);
        this.config = eventHubConfig;
        this.systemName = str;
        this.partitioningMethod = eventHubConfig.getPartitioningMethod(str);
        this.interceptors = map;
        this.maxMessageSize = eventHubConfig.getSkipMessagesLargerThan(str).intValue();
        this.eventHubClientManagerFactory = eventHubClientManagerFactory;
    }

    private void init() {
        LOG.info("Initializing EventHubSystemProducer");
        for (String str : this.config.getStreams(this.systemName)) {
            EventHubClientManager eventHubClientManager = this.eventHubClientManagerFactory.getEventHubClientManager(this.systemName, str, this.config);
            this.perStreamEventHubClientManagers.put(str, eventHubClientManager);
            eventHubClientManager.init();
        }
        if (PartitioningMethod.PARTITION_KEY_AS_PARTITION.equals(this.partitioningMethod)) {
            this.perStreamEventHubClientManagers.forEach((str2, eventHubClientManager2) -> {
                EventHubClient eventHubClient = eventHubClientManager2.getEventHubClient();
                try {
                    HashMap hashMap = new HashMap();
                    Integer valueOf = Integer.valueOf(((EventHubRuntimeInformation) eventHubClient.getRuntimeInformation().get(this.config.getRuntimeInfoWaitTimeMS(this.systemName), TimeUnit.MILLISECONDS)).getPartitionCount());
                    for (int i = 0; i < valueOf.intValue(); i++) {
                        hashMap.put(Integer.valueOf(i), createOrGetEventHubClientManagerForPartition(str2, i).getEventHubClient().createPartitionSenderSync(String.valueOf(i)));
                    }
                    this.streamPartitionSenders.put(str2, hashMap);
                } catch (EventHubException | IllegalArgumentException e) {
                    throw new SamzaException("Creation of partition sender failed with exception", e);
                } catch (InterruptedException | ExecutionException | TimeoutException e2) {
                    throw new SamzaException("Failed to fetch number of Event Hub partitions for partition sender creation", e2);
                }
            });
        }
        this.isInitialized = true;
        LOG.info("EventHubSystemProducer initialized.");
    }

    public synchronized void register(String str) {
        LOG.info("Registering source {}", str);
        if (this.isStarted) {
            throw new SamzaException("Cannot register once the producer is started.");
        }
    }

    private EventHubClientManager createOrGetEventHubClientManagerForPartition(String str, int i) {
        EventHubClientManager eventHubClientManager;
        Map<Integer, EventHubClientManager> computeIfAbsent = this.perPartitionEventHubClients.computeIfAbsent(str, str2 -> {
            return new HashMap();
        });
        if (!this.config.getPerPartitionConnection(this.systemName).booleanValue()) {
            eventHubClientManager = this.perStreamEventHubClientManagers.get(str);
            computeIfAbsent.put(Integer.valueOf(i), eventHubClientManager);
        } else if (computeIfAbsent.containsKey(Integer.valueOf(i))) {
            LOG.warn(String.format("Trying to create new EventHubClientManager for partition=%d. But one already exists", Integer.valueOf(i)));
            eventHubClientManager = computeIfAbsent.get(Integer.valueOf(i));
        } else {
            LOG.info(String.format("Creating EventHub client manager for streamId=%s, partitionId=%d: ", str, Integer.valueOf(i)));
            eventHubClientManager = this.eventHubClientManagerFactory.getEventHubClientManager(this.systemName, str, this.config);
            eventHubClientManager.init();
            computeIfAbsent.put(Integer.valueOf(i), eventHubClientManager);
        }
        Validate.notNull(eventHubClientManager, String.format("Fail to create or get EventHubClientManager for streamId=%s, partitionId=%d", str, Integer.valueOf(i)), new Object[0]);
        return eventHubClientManager;
    }

    @Override // org.apache.samza.system.eventhub.producer.AsyncSystemProducer
    public synchronized void start() {
        super.start();
        LOG.info("Starting system producer.");
        this.streamIds.forEach(str -> {
            this.eventSkipRate.put(str, this.metricsRegistry.newCounter(str, EVENT_SKIP_RATE));
            this.eventWriteRate.put(str, this.metricsRegistry.newCounter(str, EVENT_WRITE_RATE));
            this.eventByteWriteRate.put(str, this.metricsRegistry.newCounter(str, EVENT_BYTE_WRITE_RATE));
        });
        synchronized (AGGREGATE_METRICS_LOCK) {
            if (aggEventWriteRate == null) {
                aggEventSkipRate = this.metricsRegistry.newCounter(EventHubSystemConsumer.AGGREGATE, EVENT_SKIP_RATE);
                aggEventWriteRate = this.metricsRegistry.newCounter(EventHubSystemConsumer.AGGREGATE, EVENT_WRITE_RATE);
                aggEventByteWriteRate = this.metricsRegistry.newCounter(EventHubSystemConsumer.AGGREGATE, EVENT_BYTE_WRITE_RATE);
            }
        }
        this.isStarted = true;
    }

    @Override // org.apache.samza.system.eventhub.producer.AsyncSystemProducer
    public synchronized void flush(String str) {
        super.flush(str);
    }

    @Override // org.apache.samza.system.eventhub.producer.AsyncSystemProducer
    public synchronized CompletableFuture<Void> sendAsync(String str, OutgoingMessageEnvelope outgoingMessageEnvelope) {
        LOG.debug(String.format("Trying to send %s", outgoingMessageEnvelope));
        if (!this.isStarted) {
            throw new SamzaException("Trying to call send before the producer is started.");
        }
        if (!this.isInitialized) {
            init();
        }
        String streamId = this.config.getStreamId(outgoingMessageEnvelope.getSystemStream().getStream());
        if (!this.perStreamEventHubClientManagers.containsKey(streamId)) {
            throw new SamzaException(String.format("Trying to send event to a destination {%s} that is not registered.", streamId));
        }
        EventData createEventData = createEventData(streamId, outgoingMessageEnvelope);
        int length = createEventData.getBytes() == null ? 0 : createEventData.getBytes().length;
        if (this.maxMessageSize > 0 && length > this.maxMessageSize) {
            LOG.info("Received a message with size {} > maxMessageSize configured {(}), Skipping it", Integer.valueOf(length), Integer.valueOf(this.maxMessageSize));
            this.eventSkipRate.get(streamId).inc();
            aggEventSkipRate.inc();
            return CompletableFuture.completedFuture(null);
        }
        this.eventWriteRate.get(streamId).inc();
        aggEventWriteRate.inc();
        this.eventByteWriteRate.get(streamId).inc(length);
        aggEventByteWriteRate.inc(length);
        return sendToEventHub(streamId, createEventData, getEnvelopePartitionId(outgoingMessageEnvelope), this.perStreamEventHubClientManagers.get(streamId).getEventHubClient());
    }

    private CompletableFuture<Void> sendToEventHub(String str, EventData eventData, Object obj, EventHubClient eventHubClient) {
        if (PartitioningMethod.ROUND_ROBIN.equals(this.partitioningMethod)) {
            return eventHubClient.send(eventData);
        }
        if (PartitioningMethod.EVENT_HUB_HASHING.equals(this.partitioningMethod)) {
            if (obj == null) {
                throw new SamzaException("Partition key cannot be null for EventHub hashing");
            }
            return eventHubClient.send(eventData, convertPartitionKeyToString(obj));
        }
        if (!PartitioningMethod.PARTITION_KEY_AS_PARTITION.equals(this.partitioningMethod)) {
            throw new SamzaException("Unknown partitioning method " + this.partitioningMethod);
        }
        if (!(obj instanceof Integer)) {
            throw new SamzaException("Partition key should be of type Integer");
        }
        return this.streamPartitionSenders.get(str).get(Integer.valueOf(((Integer) obj).intValue() % Integer.valueOf(this.streamPartitionSenders.get(str).size()).intValue())).send(eventData);
    }

    protected Object getEnvelopePartitionId(OutgoingMessageEnvelope outgoingMessageEnvelope) {
        return outgoingMessageEnvelope.getPartitionKey() == null ? outgoingMessageEnvelope.getKey() : outgoingMessageEnvelope.getPartitionKey();
    }

    private String convertPartitionKeyToString(Object obj) {
        String str;
        if (obj instanceof String) {
            str = (String) obj;
        } else if (obj instanceof Integer) {
            str = String.valueOf(obj);
        } else {
            if (!(obj instanceof byte[])) {
                throw new SamzaException("Unsupported key type: " + obj.getClass().toString());
            }
            str = new String((byte[]) obj, Charset.defaultCharset());
        }
        if (str != null && str.length() > 128) {
            LOG.debug("Length of partition key: {} exceeds limit: {}. Truncating.", Integer.valueOf(str.length()), 128);
            str = str.substring(0, 128);
        }
        return str;
    }

    protected EventData createEventData(String str, OutgoingMessageEnvelope outgoingMessageEnvelope) {
        Optional ofNullable = Optional.ofNullable(this.interceptors.getOrDefault(str, null));
        byte[] bArr = (byte[]) outgoingMessageEnvelope.getMessage();
        if (ofNullable.isPresent()) {
            bArr = ((Interceptor) ofNullable.get()).intercept(bArr);
        }
        EventDataImpl eventDataImpl = new EventDataImpl(bArr);
        eventDataImpl.getProperties().put(PRODUCE_TIMESTAMP, Long.toString(System.currentTimeMillis()));
        if (this.config.getSendKeyInEventProperties(this.systemName).booleanValue()) {
            String str2 = "";
            if (outgoingMessageEnvelope.getKey() != null) {
                str2 = outgoingMessageEnvelope.getKey() instanceof byte[] ? new String((byte[]) outgoingMessageEnvelope.getKey()) : outgoingMessageEnvelope.getKey().toString();
            }
            eventDataImpl.getProperties().put(KEY, str2);
        }
        return eventDataImpl;
    }

    public synchronized void stop() {
        LOG.info("Stopping producer.");
        this.streamPartitionSenders.values().forEach(map -> {
            ArrayList arrayList = new ArrayList();
            map.forEach((num, partitionSender) -> {
                arrayList.add(partitionSender.close());
            });
            try {
                CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[arrayList.size()])).get(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                LOG.error("Closing the partition sender failed ", e);
            }
        });
        this.perStreamEventHubClientManagers.values().parallelStream().forEach(eventHubClientManager -> {
            eventHubClientManager.close(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS);
        });
        this.perStreamEventHubClientManagers.clear();
        if (this.config.getPerPartitionConnection(this.systemName).booleanValue()) {
            this.perPartitionEventHubClients.values().stream().flatMap(map2 -> {
                return map2.values().stream();
            }).forEach(eventHubClientManager2 -> {
                eventHubClientManager2.close(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS);
            });
            this.perPartitionEventHubClients.clear();
        }
        this.isStarted = false;
        this.isInitialized = false;
        LOG.info("EventHubSystemProducer stopped.");
    }

    Collection<CompletableFuture<Void>> getPendingFutures() {
        return this.pendingFutures;
    }
}
