package org.apache.samza.system.eventhub.admin;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubException;
import com.microsoft.azure.eventhubs.EventHubRuntimeInformation;
import com.microsoft.azure.eventhubs.EventPosition;
import com.microsoft.azure.eventhubs.PartitionReceiver;
import com.microsoft.azure.eventhubs.PartitionRuntimeInformation;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.startpoint.Startpoint;
import org.apache.samza.startpoint.StartpointOldest;
import org.apache.samza.startpoint.StartpointSpecific;
import org.apache.samza.startpoint.StartpointTimestamp;
import org.apache.samza.startpoint.StartpointUpcoming;
import org.apache.samza.startpoint.StartpointVisitor;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.system.eventhub.EventHubClientManager;
import org.apache.samza.system.eventhub.EventHubClientManagerFactory;
import org.apache.samza.system.eventhub.EventHubConfig;
import org.apache.samza.system.eventhub.consumer.EventHubSystemConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/samza/system/eventhub/admin/EventHubSystemAdmin.class */
public class EventHubSystemAdmin implements SystemAdmin {
    private static final Logger LOG = LoggerFactory.getLogger(EventHubSystemAdmin.class);
    private static final long DEFAULT_SHUTDOWN_TIMEOUT_MILLIS = Duration.ofMinutes(1).toMillis();
    private final EventHubClientManagerFactory eventHubClientManagerFactory;
    private final String systemName;
    private final EventHubConfig eventHubConfig;
    private final Map<String, EventHubClientManager> eventHubClients = new HashMap();
    private final Map<String, String[]> streamPartitions = new HashMap();
    private final EventHubSamzaOffsetResolver eventHubSamzaOffsetResolver;

    @VisibleForTesting
    /* loaded from: input_file:org/apache/samza/system/eventhub/admin/EventHubSystemAdmin$EventHubSamzaOffsetResolver.class */
    static class EventHubSamzaOffsetResolver implements StartpointVisitor<SystemStreamPartition, String> {
        private final EventHubSystemAdmin eventHubSystemAdmin;
        private final EventHubConfig eventHubConfig;

        EventHubSamzaOffsetResolver(EventHubSystemAdmin eventHubSystemAdmin, EventHubConfig eventHubConfig) {
            this.eventHubSystemAdmin = eventHubSystemAdmin;
            this.eventHubConfig = eventHubConfig;
        }

        public String visit(SystemStreamPartition systemStreamPartition, StartpointSpecific startpointSpecific) {
            return startpointSpecific.getSpecificOffset();
        }

        public String visit(SystemStreamPartition systemStreamPartition, StartpointTimestamp startpointTimestamp) {
            String stream = systemStreamPartition.getStream();
            PartitionReceiver partitionReceiver = null;
            try {
                try {
                    partitionReceiver = this.eventHubSystemAdmin.getOrCreateStreamEventHubClient(stream).getEventHubClient().createReceiverSync(this.eventHubConfig.getStreamConsumerGroup(systemStreamPartition.getSystem(), stream), String.valueOf(systemStreamPartition.getPartition().getPartitionId()), EventPosition.fromEnqueuedTime(Instant.ofEpochMilli(startpointTimestamp.getTimestampOffset().longValue())));
                    ArrayList newArrayList = Lists.newArrayList(partitionReceiver.receiveSync(1));
                    Preconditions.checkState(newArrayList.size() == 1, "Failed to read messages from EventHub system.");
                    String offset = ((EventData) newArrayList.get(0)).getSystemProperties().getOffset();
                    if (partitionReceiver != null) {
                        try {
                            partitionReceiver.closeSync();
                        } catch (EventHubException e) {
                            EventHubSystemAdmin.LOG.error(String.format("Exception occurred when closing partition-receiver of the stream: %s", stream), e);
                        }
                    }
                    return offset;
                } catch (Throwable th) {
                    if (partitionReceiver != null) {
                        try {
                            partitionReceiver.closeSync();
                        } catch (EventHubException e2) {
                            EventHubSystemAdmin.LOG.error(String.format("Exception occurred when closing partition-receiver of the stream: %s", stream), e2);
                        }
                    }
                    throw th;
                }
            } catch (EventHubException e3) {
                EventHubSystemAdmin.LOG.error(String.format("Exception occurred when fetching offset for timestamp: %d from the stream: %s", startpointTimestamp.getTimestampOffset(), stream), e3);
                throw new SamzaException(e3);
            }
        }

        public String visit(SystemStreamPartition systemStreamPartition, StartpointOldest startpointOldest) {
            return EventHubSystemConsumer.START_OF_STREAM;
        }

        public String visit(SystemStreamPartition systemStreamPartition, StartpointUpcoming startpointUpcoming) {
            return EventHubSystemConsumer.END_OF_STREAM;
        }
    }

    public EventHubSystemAdmin(String str, EventHubConfig eventHubConfig, EventHubClientManagerFactory eventHubClientManagerFactory) {
        this.systemName = str;
        this.eventHubConfig = eventHubConfig;
        this.eventHubClientManagerFactory = eventHubClientManagerFactory;
        this.eventHubSamzaOffsetResolver = new EventHubSamzaOffsetResolver(this, eventHubConfig);
    }

    public void stop() {
        for (Map.Entry<String, EventHubClientManager> entry : this.eventHubClients.entrySet()) {
            try {
                entry.getValue().close(DEFAULT_SHUTDOWN_TIMEOUT_MILLIS);
            } catch (Exception e) {
                LOG.warn(String.format("Exception occurred when closing EventHubClient of stream: %s.", entry.getKey()), e);
            }
        }
        this.eventHubClients.clear();
    }

    public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> map) {
        return map;
    }

    private String printEventHubRuntimeInfo(EventHubRuntimeInformation eventHubRuntimeInformation) {
        return eventHubRuntimeInformation == null ? "[EventHubRuntimeInformation: null]" : String.format("[EventHubRuntimeInformation: createAt=%s, partitionCount=%d, path=%s]", eventHubRuntimeInformation.getCreatedAt(), Integer.valueOf(eventHubRuntimeInformation.getPartitionCount()), eventHubRuntimeInformation.getPath());
    }

    private String printPartitionRuntimeInfo(PartitionRuntimeInformation partitionRuntimeInformation) {
        return partitionRuntimeInformation == null ? "[PartitionRuntimeInformation: null]" : "[PartitionRuntimeInformation: eventHubPath=" + partitionRuntimeInformation.getEventHubPath() + " partitionId=" + partitionRuntimeInformation.getPartitionId() + " lastEnqueuedTimeUtc=" + partitionRuntimeInformation.getLastEnqueuedTimeUtc() + " lastEnqueuedOffset=" + partitionRuntimeInformation.getLastEnqueuedOffset() + " numMessages=" + (partitionRuntimeInformation.getLastEnqueuedSequenceNumber() - partitionRuntimeInformation.getBeginSequenceNumber()) + "]";
    }

    public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> set) {
        HashMap hashMap = new HashMap();
        try {
            for (String str : set) {
                if (!this.streamPartitions.containsKey(str)) {
                    LOG.debug(String.format("Partition ids for Stream=%s not found", str));
                    EventHubRuntimeInformation eventHubRuntimeInformation = (EventHubRuntimeInformation) getOrCreateStreamEventHubClient(str).getEventHubClient().getRuntimeInformation().get(this.eventHubConfig.getRuntimeInfoWaitTimeMS(this.systemName), TimeUnit.MILLISECONDS);
                    LOG.info(String.format("Adding partition ids=%s for stream=%s. EHRuntimetInfo=%s", Arrays.toString(eventHubRuntimeInformation.getPartitionIds()), str, printEventHubRuntimeInfo(eventHubRuntimeInformation)));
                    this.streamPartitions.put(str, eventHubRuntimeInformation.getPartitionIds());
                }
                hashMap.put(str, new SystemStreamMetadata(str, getPartitionMetadata(str, this.streamPartitions.get(str))));
            }
            return hashMap;
        } catch (Exception e) {
            String format = String.format("Error while fetching EventHubRuntimeInfo for System:%s", this.systemName);
            LOG.error(format, e);
            throw new SamzaException(format, e);
        }
    }

    public String resolveStartpointToOffset(SystemStreamPartition systemStreamPartition, Startpoint startpoint) {
        return (String) startpoint.apply(systemStreamPartition, this.eventHubSamzaOffsetResolver);
    }

    EventHubClientManager getOrCreateStreamEventHubClient(String str) {
        if (!this.eventHubClients.containsKey(str)) {
            LOG.info(String.format("Creating EventHubClient for Stream=%s", str));
            EventHubClientManager eventHubClientManager = this.eventHubClientManagerFactory.getEventHubClientManager(this.systemName, str, this.eventHubConfig);
            eventHubClientManager.init();
            this.eventHubClients.put(str, eventHubClientManager);
        }
        return this.eventHubClients.get(str);
    }

    private Map<Partition, SystemStreamMetadata.SystemStreamPartitionMetadata> getPartitionMetadata(String str, String[] strArr) {
        EventHubClientManager orCreateStreamEventHubClient = getOrCreateStreamEventHubClient(str);
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        for (String str2 : strArr) {
            CompletableFuture partitionRuntimeInformation = orCreateStreamEventHubClient.getEventHubClient().getPartitionRuntimeInformation(str2);
            arrayList.add(partitionRuntimeInformation);
            partitionRuntimeInformation.thenAccept(partitionRuntimeInformation2 -> {
                LOG.info(printPartitionRuntimeInfo(partitionRuntimeInformation2));
                hashMap.put(new Partition(Integer.parseInt(str2)), new SystemStreamMetadata.SystemStreamPartitionMetadata(EventHubSystemConsumer.START_OF_STREAM, partitionRuntimeInformation2.getLastEnqueuedOffset(), EventHubSystemConsumer.END_OF_STREAM));
            });
        }
        try {
            CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[arrayList.size()])).get(this.eventHubConfig.getRuntimeInfoWaitTimeMS(this.systemName), TimeUnit.MILLISECONDS);
            return hashMap;
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            String format = String.format("Error while fetching EventHubPartitionRuntimeInfo for System:%s, Stream:%s", this.systemName, str);
            LOG.error(format, e);
            throw new SamzaException(format, e);
        }
    }

    public static Integer compareOffsets(String str, String str2) {
        if (str == null || str2 == null || EventHubSystemConsumer.END_OF_STREAM.equals(str) || EventHubSystemConsumer.END_OF_STREAM.equals(str2)) {
            return null;
        }
        try {
            return Integer.valueOf(Long.compare(Long.parseLong(str), Long.parseLong(str2)));
        } catch (NumberFormatException e) {
            return null;
        }
    }

    public Integer offsetComparator(String str, String str2) {
        return compareOffsets(str, str2);
    }
}
