package org.apache.flink.streaming.connectors.pulsar.internal;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkOutputMultiplexer;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader;
import org.apache.flink.streaming.connectors.pulsar.internal.metrics.PulsarSourceMetrics;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.util.serialization.PulsarDeserializationSchema;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.shade.com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/connectors/pulsar/internal/PulsarFetcher.class */
public class PulsarFetcher<T> {
    private static final Logger log;
    private static final int NO_TIMESTAMPS_WATERMARKS = 0;
    private static final int WITH_WATERMARK_GENERATOR = 1;
    protected final SourceFunction.SourceContext<T> sourceContext;
    protected final Map<TopicRange, MessageId> seedTopicsWithInitialOffsets;
    protected final Set<TopicRange> excludeStartMessageIds;
    private final Object checkpointLock;
    protected final List<PulsarTopicState<T>> subscribedPartitionStates;
    protected final ClosableBlockingQueue<PulsarTopicState<T>> unassignedPartitionsQueue;
    private final int timestampWatermarkMode;
    private final SerializedValue<WatermarkStrategy<T>> watermarkStrategy;
    private final ClassLoader userCodeClassLoader;
    private final StreamingRuntimeContext runtimeContext;
    protected final ClientConfigurationData clientConf;
    protected final Map<String, Object> readerConf;
    protected final PulsarDeserializationSchema<T> deserializer;
    protected final int pollTimeoutMs;
    private final int commitMaxRetries;
    protected final PulsarMetadataReader metadataReader;
    protected final WatermarkOutput watermarkOutput;
    private final WatermarkOutputMultiplexer watermarkOutputMultiplexer;
    private volatile boolean running;
    private Map<TopicRange, ReaderThread<T>> topicToThread;
    private boolean failOnDataLoss;
    private boolean useEarliestWhenDataLoss;
    private final boolean useMetrics;
    private final MetricGroup consumerMetricGroup;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.streaming.connectors.pulsar.internal.PulsarFetcher$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/pulsar/internal/PulsarFetcher$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$streaming$connectors$pulsar$internal$PulsarFetcher$OffsetGaugeType = new int[OffsetGaugeType.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$streaming$connectors$pulsar$internal$PulsarFetcher$OffsetGaugeType[OffsetGaugeType.COMMITTED_OFFSET.ordinal()] = PulsarFetcher.WITH_WATERMARK_GENERATOR;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$connectors$pulsar$internal$PulsarFetcher$OffsetGaugeType[OffsetGaugeType.CURRENT_OFFSET.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/pulsar/internal/PulsarFetcher$BreakingException.class */
    public static class BreakingException extends Exception {
        static final BreakingException INSTANCE = new BreakingException();

        private BreakingException() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/pulsar/internal/PulsarFetcher$OffsetGauge.class */
    public static class OffsetGauge implements Gauge<MessageId> {
        private final PulsarTopicState<?> pts;
        private final OffsetGaugeType gaugeType;

        OffsetGauge(PulsarTopicState<?> pulsarTopicState, OffsetGaugeType offsetGaugeType) {
            this.pts = pulsarTopicState;
            this.gaugeType = offsetGaugeType;
        }

        /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
        public MessageId m53getValue() {
            switch (AnonymousClass1.$SwitchMap$org$apache$flink$streaming$connectors$pulsar$internal$PulsarFetcher$OffsetGaugeType[this.gaugeType.ordinal()]) {
                case PulsarFetcher.WITH_WATERMARK_GENERATOR /* 1 */:
                    return this.pts.getCommittedOffset();
                case 2:
                    return this.pts.getOffset();
                default:
                    throw new RuntimeException("Unknown gauge type: " + this.gaugeType);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/pulsar/internal/PulsarFetcher$OffsetGaugeType.class */
    public enum OffsetGaugeType {
        CURRENT_OFFSET,
        COMMITTED_OFFSET
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/pulsar/internal/PulsarFetcher$PeriodicWatermarkEmitter.class */
    private static class PeriodicWatermarkEmitter<T> implements ProcessingTimeCallback {
        private final Object checkpointLock;
        private final List<PulsarTopicState<T>> allPartitions;
        private final WatermarkOutputMultiplexer watermarkOutputMultiplexer;
        private final ProcessingTimeService timerService;
        private final long interval;

        PeriodicWatermarkEmitter(Object obj, List<PulsarTopicState<T>> list, WatermarkOutputMultiplexer watermarkOutputMultiplexer, ProcessingTimeService processingTimeService, long j) {
            this.checkpointLock = obj;
            this.allPartitions = (List) Preconditions.checkNotNull(list);
            this.watermarkOutputMultiplexer = watermarkOutputMultiplexer;
            this.timerService = (ProcessingTimeService) Preconditions.checkNotNull(processingTimeService);
            this.interval = j;
        }

        public void start() {
            this.timerService.registerTimer(this.timerService.getCurrentProcessingTime() + this.interval, this);
        }

        public void onProcessingTime(long j) throws Exception {
            synchronized (this.checkpointLock) {
                Iterator<PulsarTopicState<T>> it = this.allPartitions.iterator();
                while (it.hasNext()) {
                    it.next().onPeriodicEmit();
                }
                this.watermarkOutputMultiplexer.onPeriodicEmit();
            }
            this.timerService.registerTimer(this.timerService.getCurrentProcessingTime() + this.interval, this);
        }
    }

    public PulsarFetcher(SourceFunction.SourceContext<T> sourceContext, Map<TopicRange, MessageId> map, SerializedValue<WatermarkStrategy<T>> serializedValue, ProcessingTimeService processingTimeService, long j, ClassLoader classLoader, StreamingRuntimeContext streamingRuntimeContext, ClientConfigurationData clientConfigurationData, Map<String, Object> map2, int i, PulsarDeserializationSchema<T> pulsarDeserializationSchema, PulsarMetadataReader pulsarMetadataReader, MetricGroup metricGroup, boolean z) throws Exception {
        this(sourceContext, map, Collections.emptySet(), serializedValue, processingTimeService, j, classLoader, streamingRuntimeContext, clientConfigurationData, map2, i, 3, pulsarDeserializationSchema, pulsarMetadataReader, metricGroup, z);
    }

    public PulsarFetcher(SourceFunction.SourceContext<T> sourceContext, Map<TopicRange, MessageId> map, Set<TopicRange> set, SerializedValue<WatermarkStrategy<T>> serializedValue, ProcessingTimeService processingTimeService, long j, ClassLoader classLoader, StreamingRuntimeContext streamingRuntimeContext, ClientConfigurationData clientConfigurationData, Map<String, Object> map2, int i, int i2, PulsarDeserializationSchema<T> pulsarDeserializationSchema, PulsarMetadataReader pulsarMetadataReader, MetricGroup metricGroup, boolean z) throws Exception {
        this.running = true;
        this.failOnDataLoss = true;
        this.sourceContext = sourceContext;
        this.watermarkOutput = new SourceContextWatermarkOutputAdapter(sourceContext);
        this.watermarkOutputMultiplexer = new WatermarkOutputMultiplexer(this.watermarkOutput);
        this.useMetrics = z;
        this.consumerMetricGroup = (MetricGroup) Preconditions.checkNotNull(metricGroup);
        this.seedTopicsWithInitialOffsets = map;
        this.excludeStartMessageIds = set;
        this.checkpointLock = sourceContext.getCheckpointLock();
        this.userCodeClassLoader = classLoader;
        this.runtimeContext = streamingRuntimeContext;
        this.clientConf = clientConfigurationData;
        this.readerConf = map2 == null ? new HashMap<>() : map2;
        this.failOnDataLoss = SourceSinkUtils.getFailOnDataLossAndRemoveKey(this.readerConf);
        this.useEarliestWhenDataLoss = SourceSinkUtils.getUseEarliestWhenDataLossAndRemoveKey(this.readerConf);
        this.pollTimeoutMs = i;
        this.commitMaxRetries = i2;
        this.deserializer = pulsarDeserializationSchema;
        this.metadataReader = pulsarMetadataReader;
        this.watermarkStrategy = serializedValue;
        if (serializedValue == null) {
            this.timestampWatermarkMode = NO_TIMESTAMPS_WATERMARKS;
        } else {
            this.timestampWatermarkMode = WITH_WATERMARK_GENERATOR;
        }
        this.unassignedPartitionsQueue = new ClosableBlockingQueue<>();
        this.subscribedPartitionStates = createPartitionStateHolders(map, this.timestampWatermarkMode, serializedValue, classLoader);
        Iterator<PulsarTopicState<T>> it = this.subscribedPartitionStates.iterator();
        while (it.hasNext()) {
            if (!it.next().isOffsetDefined()) {
                throw new IllegalArgumentException("The fetcher was assigned seed partitions with undefined initial offsets.");
            }
        }
        Iterator<PulsarTopicState<T>> it2 = this.subscribedPartitionStates.iterator();
        while (it2.hasNext()) {
            this.unassignedPartitionsQueue.add(it2.next());
        }
        if (z) {
            registerOffsetMetrics(metricGroup, this.subscribedPartitionStates);
        }
        if (this.timestampWatermarkMode != WITH_WATERMARK_GENERATOR || j <= 0) {
            return;
        }
        new PeriodicWatermarkEmitter(this.checkpointLock, this.subscribedPartitionStates, this.watermarkOutputMultiplexer, processingTimeService, j).start();
    }

    public void runFetchLoop() throws Exception {
        int i;
        int i2;
        int i3;
        this.topicToThread = new HashMap();
        ExceptionProxy exceptionProxy = new ExceptionProxy(Thread.currentThread());
        while (this.running) {
            try {
                try {
                    exceptionProxy.checkAndThrowException();
                    List<PulsarTopicState<T>> batchBlocking = this.unassignedPartitionsQueue.getBatchBlocking(5000L);
                    batchBlocking.removeIf(pulsarTopicState -> {
                        return pulsarTopicState.equals(PoisonState.INSTANCE);
                    });
                    if (batchBlocking.isEmpty()) {
                        this.topicToThread.values().removeIf(readerThread -> {
                            return !readerThread.isRunning();
                        });
                    } else {
                        if (!this.running) {
                            throw BreakingException.INSTANCE;
                        }
                        this.topicToThread.putAll(createAndStartReaderThread(batchBlocking, exceptionProxy));
                    }
                    if (this.topicToThread.size() == 0 && this.unassignedPartitionsQueue.isEmpty()) {
                        PulsarTopicState<T> elementBlocking = this.unassignedPartitionsQueue.getElementBlocking();
                        if (elementBlocking.equals(PoisonState.INSTANCE)) {
                            throw BreakingException.INSTANCE;
                        }
                        this.topicToThread.putAll(createAndStartReaderThread(ImmutableList.of(elementBlocking), exceptionProxy));
                    }
                } catch (Throwable th) {
                    this.running = false;
                    Thread.interrupted();
                    do {
                        try {
                            i2 = NO_TIMESTAMPS_WATERMARKS;
                            this.topicToThread.values().removeIf(readerThread2 -> {
                                return !readerThread2.isAlive();
                            });
                            Iterator<ReaderThread<T>> it = this.topicToThread.values().iterator();
                            while (it.hasNext()) {
                                it.next().cancel();
                                i2 += WITH_WATERMARK_GENERATOR;
                            }
                            if (i2 > 0) {
                                Iterator<ReaderThread<T>> it2 = this.topicToThread.values().iterator();
                                while (it2.hasNext()) {
                                    it2.next().join((500 / i2) + WITH_WATERMARK_GENERATOR);
                                }
                            }
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        } catch (Throwable th2) {
                            log.error("Exception while shutting down reader threads", th2);
                        }
                    } while (i2 > 0);
                    throw th;
                }
            } catch (InterruptedException e2) {
                exceptionProxy.checkAndThrowException();
                throw e2;
            } catch (BreakingException e3) {
                this.running = false;
                Thread.interrupted();
                do {
                    try {
                        i = NO_TIMESTAMPS_WATERMARKS;
                        this.topicToThread.values().removeIf(readerThread22 -> {
                            return !readerThread22.isAlive();
                        });
                        Iterator<ReaderThread<T>> it3 = this.topicToThread.values().iterator();
                        while (it3.hasNext()) {
                            it3.next().cancel();
                            i += WITH_WATERMARK_GENERATOR;
                        }
                        if (i > 0) {
                            Iterator<ReaderThread<T>> it4 = this.topicToThread.values().iterator();
                            while (it4.hasNext()) {
                                it4.next().join((500 / i) + WITH_WATERMARK_GENERATOR);
                            }
                        }
                    } catch (InterruptedException e4) {
                        Thread.currentThread().interrupt();
                        return;
                    } catch (Throwable th3) {
                        log.error("Exception while shutting down reader threads", th3);
                        return;
                    }
                } while (i > 0);
                return;
            }
        }
        this.running = false;
        Thread.interrupted();
        do {
            try {
                i3 = NO_TIMESTAMPS_WATERMARKS;
                this.topicToThread.values().removeIf(readerThread222 -> {
                    return !readerThread222.isAlive();
                });
                Iterator<ReaderThread<T>> it5 = this.topicToThread.values().iterator();
                while (it5.hasNext()) {
                    it5.next().cancel();
                    i3 += WITH_WATERMARK_GENERATOR;
                }
                if (i3 > 0) {
                    Iterator<ReaderThread<T>> it6 = this.topicToThread.values().iterator();
                    while (it6.hasNext()) {
                        it6.next().join((500 / i3) + WITH_WATERMARK_GENERATOR);
                    }
                }
            } catch (InterruptedException e5) {
                Thread.currentThread().interrupt();
                return;
            } catch (Throwable th4) {
                log.error("Exception while shutting down reader threads", th4);
                return;
            }
        } while (i3 > 0);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void emitRecordsWithTimestamps(T t, PulsarTopicState<T> pulsarTopicState, MessageId messageId, long j) {
        synchronized (this.checkpointLock) {
            if (t != null) {
                long extractTimestamp = pulsarTopicState.extractTimestamp(t, j);
                this.sourceContext.collectWithTimestamp(t, extractTimestamp);
                pulsarTopicState.onEvent(t, extractTimestamp);
            }
            pulsarTopicState.setOffset(messageId);
        }
    }

    public void cancel() throws Exception {
        this.running = false;
        try {
            this.metadataReader.removeCursor((Set) this.subscribedPartitionStates.stream().map((v0) -> {
                return v0.getTopicRange();
            }).collect(Collectors.toSet()));
        } catch (PulsarMetadataReader.ClosedException e) {
        } catch (Exception e2) {
            throw e2;
        }
        this.unassignedPartitionsQueue.addIfOpen(PoisonState.INSTANCE);
    }

    public void commitOffsetToPulsar(Map<TopicRange, MessageId> map, PulsarCommitCallback pulsarCommitCallback) throws InterruptedException {
        doCommitOffsetToPulsar(removeEarliestAndLatest(map), pulsarCommitCallback);
    }

    protected void doCommitOffsetToPulsar(Map<TopicRange, MessageId> map, PulsarCommitCallback pulsarCommitCallback) throws InterruptedException {
        int i = NO_TIMESTAMPS_WATERMARKS;
        boolean z = NO_TIMESTAMPS_WATERMARKS;
        while (this.running) {
            try {
                try {
                    this.metadataReader.commitOffsetToCursor(map);
                    z = WITH_WATERMARK_GENERATOR;
                    break;
                } catch (Exception e) {
                    log.warn("Failed to commit cursor to Pulsar.", e);
                    if (i >= this.commitMaxRetries) {
                        log.error("Failed to commit cursor to Pulsar after {} attempts", Integer.valueOf(i));
                        throw e;
                    }
                    i += WITH_WATERMARK_GENERATOR;
                    Thread.sleep(1000L);
                }
            } catch (Exception e2) {
                if (!this.running) {
                    return;
                } else {
                    pulsarCommitCallback.onException(e2);
                }
            }
        }
        if (z) {
            pulsarCommitCallback.onSuccess();
            for (PulsarTopicState<T> pulsarTopicState : this.subscribedPartitionStates) {
                MessageId messageId = map.get(pulsarTopicState.getTopicRange());
                if (messageId != null) {
                    pulsarTopicState.setCommittedOffset(messageId);
                }
            }
        }
    }

    public Map<TopicRange, MessageId> removeEarliestAndLatest(Map<TopicRange, MessageId> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<TopicRange, MessageId> entry : map.entrySet()) {
            MessageId value = entry.getValue();
            if (!value.equals(MessageId.earliest) && !value.equals(MessageId.latest)) {
                hashMap.put(entry.getKey(), value);
            }
        }
        return hashMap;
    }

    public void addDiscoveredTopics(Set<TopicRange> set) throws IOException, ClassNotFoundException {
        for (PulsarTopicState<T> pulsarTopicState : createPartitionStateHolders((Map) set.stream().collect(Collectors.toMap(topicRange -> {
            return topicRange;
        }, topicRange2 -> {
            return MessageId.earliest;
        })), this.timestampWatermarkMode, this.watermarkStrategy, this.userCodeClassLoader)) {
            this.subscribedPartitionStates.add(pulsarTopicState);
            this.unassignedPartitionsQueue.add(pulsarTopicState);
        }
    }

    public Map<TopicRange, MessageId> snapshotCurrentState() {
        if (!$assertionsDisabled && !Thread.holdsLock(this.checkpointLock)) {
            throw new AssertionError();
        }
        HashMap hashMap = new HashMap(this.subscribedPartitionStates.size());
        for (PulsarTopicState<T> pulsarTopicState : this.subscribedPartitionStates) {
            hashMap.put(pulsarTopicState.getTopicRange(), pulsarTopicState.getOffset());
        }
        return hashMap;
    }

    public Map<TopicRange, ReaderThread<T>> createAndStartReaderThread(List<PulsarTopicState<T>> list, ExceptionProxy exceptionProxy) {
        this.metadataReader.setupCursor((Map) list.stream().collect(Collectors.toMap((v0) -> {
            return v0.getTopicRange();
        }, (v0) -> {
            return v0.getOffset();
        })), this.failOnDataLoss);
        HashMap hashMap = new HashMap();
        for (PulsarTopicState<T> pulsarTopicState : list) {
            ReaderThread<T> createReaderThread = createReaderThread(exceptionProxy, pulsarTopicState);
            createReaderThread.setName(String.format("Pulsar Reader for %s in task %s", pulsarTopicState.getTopicRange(), this.runtimeContext.getTaskName()));
            createReaderThread.setDaemon(true);
            createReaderThread.start();
            log.info("Starting Thread {}", createReaderThread.getName());
            hashMap.put(pulsarTopicState.getTopicRange(), createReaderThread);
        }
        return hashMap;
    }

    protected List<PulsarTopicState<T>> getSubscribedTopicStates() {
        return this.subscribedPartitionStates;
    }

    protected ReaderThread<T> createReaderThread(ExceptionProxy exceptionProxy, PulsarTopicState pulsarTopicState) {
        return new ReaderThread<>(this, pulsarTopicState, this.clientConf, this.readerConf, this.deserializer, this.pollTimeoutMs, exceptionProxy, this.failOnDataLoss, this.useEarliestWhenDataLoss, this.excludeStartMessageIds.contains(pulsarTopicState.getTopicRange()));
    }

    private List<PulsarTopicState<T>> createPartitionStateHolders(Map<TopicRange, MessageId> map, int i, SerializedValue<WatermarkStrategy<T>> serializedValue, ClassLoader classLoader) throws IOException, ClassNotFoundException {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        switch (i) {
            case NO_TIMESTAMPS_WATERMARKS /* 0 */:
                for (Map.Entry<TopicRange, MessageId> entry : map.entrySet()) {
                    PulsarTopicState pulsarTopicState = new PulsarTopicState(entry.getKey());
                    pulsarTopicState.setOffset(entry.getValue());
                    copyOnWriteArrayList.add(pulsarTopicState);
                }
                return copyOnWriteArrayList;
            case WITH_WATERMARK_GENERATOR /* 1 */:
                for (Map.Entry<TopicRange, MessageId> entry2 : map.entrySet()) {
                    TopicRange key = entry2.getKey();
                    PulsarTopicState pulsarTopicState2 = new PulsarTopicState(entry2.getKey());
                    WatermarkStrategy watermarkStrategy = (WatermarkStrategy) serializedValue.deserializeValue(classLoader);
                    String topicRange = pulsarTopicState2.getTopicRange().toString();
                    this.watermarkOutputMultiplexer.registerNewOutput(topicRange);
                    PulsarTopicPartitionStateWithWatermarkGenerator pulsarTopicPartitionStateWithWatermarkGenerator = new PulsarTopicPartitionStateWithWatermarkGenerator(key, pulsarTopicState2, watermarkStrategy.createTimestampAssigner(() -> {
                        return this.consumerMetricGroup;
                    }), watermarkStrategy.createWatermarkGenerator(() -> {
                        return this.consumerMetricGroup;
                    }), this.watermarkOutputMultiplexer.getImmediateOutput(topicRange), this.watermarkOutputMultiplexer.getDeferredOutput(topicRange));
                    pulsarTopicPartitionStateWithWatermarkGenerator.setOffset(entry2.getValue());
                    copyOnWriteArrayList.add(pulsarTopicPartitionStateWithWatermarkGenerator);
                }
                return copyOnWriteArrayList;
            default:
                throw new RuntimeException();
        }
    }

    private void registerOffsetMetrics(MetricGroup metricGroup, List<PulsarTopicState<T>> list) {
        for (PulsarTopicState<T> pulsarTopicState : list) {
            MetricGroup addGroup = metricGroup.addGroup("topic", pulsarTopicState.getTopicRange().getTopic());
            addGroup.gauge(PulsarSourceMetrics.CURRENT_OFFSETS_METRICS_GAUGE, new OffsetGauge(pulsarTopicState, OffsetGaugeType.CURRENT_OFFSET));
            addGroup.gauge(PulsarSourceMetrics.COMMITTED_OFFSETS_METRICS_GAUGE, new OffsetGauge(pulsarTopicState, OffsetGaugeType.COMMITTED_OFFSET));
        }
    }

    public PulsarMetadataReader getMetaDataReader() {
        return this.metadataReader;
    }

    static {
        $assertionsDisabled = !PulsarFetcher.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(PulsarFetcher.class);
    }
}
