package org.apache.samza.system.eventhub;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.microsoft.azure.eventhubs.ConnectionStringBuilder;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
import com.microsoft.azure.eventhubs.RetryPolicy;
import com.microsoft.azure.eventhubs.impl.RetryExponential;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.samza.SamzaException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/system/eventhub/SamzaEventHubClientManager.class */
public class SamzaEventHubClientManager implements EventHubClientManager {
    private static final String EVENTHUB_REMOTE_HOST_FORMAT = "%s.servicebus.windows.net";
    private static final int MAX_RETRY_COUNT = 100;
    private static final String SAMZA_EVENTHUB_RETRY = "SAMZA_CONNECTOR_RETRY";
    private final int numClientThreads;
    private EventHubClient eventHubClient;
    private final String eventHubNamespace;
    private final String entityPath;
    private final String sasKeyName;
    private final String sasKey;
    private final RetryPolicy retryPolicy;
    private ExecutorService eventHubClientExecutor;
    private static final Logger LOG = LoggerFactory.getLogger(SamzaEventHubClientManager.class.getName());
    private static final Duration MIN_RETRY_BACKOFF = Duration.ofMillis(100);
    private static final Duration MAX_RETRY_BACKOFF = Duration.ofMillis(11000);

    public SamzaEventHubClientManager(String str, String str2, String str3, String str4, Integer num) {
        this(str, str2, str3, str4, new RetryExponential(MIN_RETRY_BACKOFF, MAX_RETRY_BACKOFF, 100, SAMZA_EVENTHUB_RETRY), num.intValue());
    }

    public SamzaEventHubClientManager(String str, String str2, String str3, String str4, RetryPolicy retryPolicy, int i) {
        this.eventHubNamespace = str;
        this.entityPath = str2;
        this.sasKeyName = str3;
        this.sasKey = str4;
        this.retryPolicy = retryPolicy;
        this.numClientThreads = i;
    }

    @Override // org.apache.samza.system.eventhub.EventHubClientManager
    public void init() {
        String format = String.format(EVENTHUB_REMOTE_HOST_FORMAT, this.eventHubNamespace);
        LOG.info("Initializing SamzaEventHubClientManager for namespace: " + this.eventHubNamespace);
        try {
            ConnectionStringBuilder sasKey = new ConnectionStringBuilder().setNamespaceName(this.eventHubNamespace).setEventHubName(this.entityPath).setSasKeyName(this.sasKeyName).setSasKey(this.sasKey);
            this.eventHubClientExecutor = Executors.newFixedThreadPool(this.numClientThreads, new ThreadFactoryBuilder().setNameFormat("Samza EventHubClient Thread-%d").setDaemon(true).build());
            this.eventHubClient = EventHubClient.createSync(sasKey.toString(), this.retryPolicy, this.eventHubClientExecutor);
            LOG.info("SamzaEventHubClientManager initialized for namespace: " + this.eventHubNamespace);
        } catch (IOException | EventHubException e) {
            String format2 = String.format("Creation of EventHub client failed for eventHub EntityPath: %s on remote host %s:%d", this.entityPath, format, 5671);
            LOG.error(format2, e);
            throw new SamzaException(format2, e);
        }
    }

    @Override // org.apache.samza.system.eventhub.EventHubClientManager
    public EventHubClient getEventHubClient() {
        return this.eventHubClient;
    }

    @Override // org.apache.samza.system.eventhub.EventHubClientManager
    public void close(long j) {
        try {
            if (j == -1) {
                this.eventHubClient.closeSync();
                this.eventHubClientExecutor.shutdown();
            } else {
                this.eventHubClient.close().get(j, TimeUnit.MILLISECONDS);
            }
        } catch (Exception e) {
            LOG.error("Closing the EventHub client failed", e);
        }
    }
}
