package org.zalando.fahrschein;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpMethod;
import org.springframework.http.client.ClientHttpRequest;
import org.springframework.http.client.ClientHttpRequestFactory;
import org.springframework.http.client.ClientHttpResponse;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.zalando.fahrschein.domain.Batch;
import org.zalando.fahrschein.domain.Cursor;
import org.zalando.fahrschein.domain.Subscription;
import org.zalando.fahrschein.metrics.MetricsCollector;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/zalando/fahrschein/NakadiReader.class */
public class NakadiReader<T> implements IORunnable {
    private static final Logger LOG = LoggerFactory.getLogger(NakadiReader.class);
    private static final TypeReference<Collection<Cursor>> COLLECTION_OF_CURSORS = new TypeReference<Collection<Cursor>>() { // from class: org.zalando.fahrschein.NakadiReader.1
    };
    private final URI uri;
    private final ClientHttpRequestFactory clientHttpRequestFactory;
    private final BackoffStrategy backoffStrategy;
    private final CursorManager cursorManager;
    private final String eventName;
    private final Optional<Subscription> subscription;
    private final Class<T> eventClass;
    private final Listener<T> listener;
    private final JsonFactory jsonFactory;
    private final ObjectReader eventReader;
    private final ObjectWriter cursorHeaderWriter;
    private final MetricsCollector metricsCollector;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/zalando/fahrschein/NakadiReader$JsonInput.class */
    public static class JsonInput implements Closeable {
        private final ClientHttpResponse response;
        private final JsonParser jsonParser;

        JsonInput(ClientHttpResponse clientHttpResponse, JsonParser jsonParser) {
            this.response = clientHttpResponse;
            this.jsonParser = jsonParser;
        }

        ClientHttpResponse getResponse() {
            return this.response;
        }

        JsonParser getJsonParser() {
            return this.jsonParser;
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            try {
                try {
                    NakadiReader.LOG.trace("Trying to close json parser");
                    this.jsonParser.close();
                    NakadiReader.LOG.trace("Closed json parser");
                    NakadiReader.LOG.trace("Trying to close response");
                    this.response.close();
                    NakadiReader.LOG.trace("Closed response");
                } catch (IOException e) {
                    NakadiReader.LOG.warn("Could not close json parser", e);
                    NakadiReader.LOG.trace("Trying to close response");
                    this.response.close();
                    NakadiReader.LOG.trace("Closed response");
                }
            } catch (Throwable th) {
                NakadiReader.LOG.trace("Trying to close response");
                this.response.close();
                NakadiReader.LOG.trace("Closed response");
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public NakadiReader(URI uri, ClientHttpRequestFactory clientHttpRequestFactory, BackoffStrategy backoffStrategy, CursorManager cursorManager, ObjectMapper objectMapper, String str, Optional<Subscription> optional, Class<T> cls, Listener<T> listener, MetricsCollector metricsCollector) {
        Preconditions.checkState(!optional.isPresent() || str.equals(Iterables.getOnlyElement(optional.get().getEventTypes())), "Only subscriptions to single event types are currently supported");
        this.uri = uri;
        this.clientHttpRequestFactory = clientHttpRequestFactory;
        this.backoffStrategy = backoffStrategy;
        this.cursorManager = cursorManager;
        this.eventName = str;
        this.subscription = optional;
        this.eventClass = cls;
        this.listener = listener;
        this.metricsCollector = metricsCollector;
        this.jsonFactory = objectMapper.getFactory();
        this.eventReader = objectMapper.reader().forType(cls);
        this.cursorHeaderWriter = objectMapper.writerFor(COLLECTION_OF_CURSORS).without(SerializationFeature.INDENT_OUTPUT);
        if (clientHttpRequestFactory instanceof HttpComponentsClientHttpRequestFactory) {
            LOG.warn("Using [{}] might block during reconnection, please consider using another implementation of ClientHttpRequestFactory", clientHttpRequestFactory.getClass().getName());
        }
    }

    private static Optional<String> getStreamId(ClientHttpResponse clientHttpResponse) {
        return Optional.ofNullable(clientHttpResponse.getHeaders()).flatMap(httpHeaders -> {
            return ((List) httpHeaders.getOrDefault("X-Nakadi-StreamId", Collections.emptyList())).stream().findFirst();
        });
    }

    private JsonInput openJsonInput() throws IOException {
        ClientHttpRequest createRequest = this.clientHttpRequestFactory.createRequest(this.uri, HttpMethod.GET);
        if (!this.subscription.isPresent()) {
            Collection<Cursor> cursors = this.cursorManager.getCursors(this.eventName);
            if (!cursors.isEmpty()) {
                createRequest.getHeaders().put("X-Nakadi-Cursors", Collections.singletonList(this.cursorHeaderWriter.writeValueAsString(cursors)));
            }
        }
        ClientHttpResponse execute = createRequest.execute();
        try {
            Optional<String> streamId = getStreamId(execute);
            JsonParser disable = this.jsonFactory.createParser(execute.getBody()).disable(JsonParser.Feature.AUTO_CLOSE_SOURCE);
            if (this.subscription.isPresent() && streamId.isPresent()) {
                this.cursorManager.addStreamId(this.subscription.get(), streamId.get());
            }
            return new JsonInput(execute, disable);
        } catch (Throwable th) {
            try {
                execute.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private void processBatch(Batch<T> batch) throws IOException {
        Cursor cursor = batch.getCursor();
        try {
            this.listener.accept(batch.getEvents());
            this.cursorManager.onSuccess(this.eventName, cursor);
        } catch (EventAlreadyProcessedException e) {
            LOG.info("Events for [{}] partition [{}] at offset [{}] were already processed", new Object[]{this.eventName, cursor.getPartition(), cursor.getOffset()});
        } catch (Throwable th) {
            this.cursorManager.onError(this.eventName, cursor, th);
            throw th;
        }
    }

    /* JADX WARN: Removed duplicated region for block: B:20:0x00bc A[SYNTHETIC] */
    /* JADX WARN: Removed duplicated region for block: B:24:0x00c4 A[SYNTHETIC] */
    /* JADX WARN: Removed duplicated region for block: B:27:0x00cd A[SYNTHETIC] */
    /* JADX WARN: Removed duplicated region for block: B:30:0x00d6 A[SYNTHETIC] */
    /* JADX WARN: Removed duplicated region for block: B:33:0x00b4 A[SYNTHETIC] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private org.zalando.fahrschein.domain.Cursor readCursor(com.fasterxml.jackson.core.JsonParser r8) throws java.io.IOException {
        /*
            Method dump skipped, instructions count: 303
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.zalando.fahrschein.NakadiReader.readCursor(com.fasterxml.jackson.core.JsonParser):org.zalando.fahrschein.domain.Cursor");
    }

    private List<T> readEvents(JsonParser jsonParser) throws IOException {
        expectToken(jsonParser, JsonToken.START_ARRAY);
        jsonParser.clearCurrentToken();
        Iterator readValues = this.eventReader.readValues(jsonParser, this.eventClass);
        ArrayList arrayList = new ArrayList();
        while (readValues.hasNext()) {
            try {
                arrayList.add(this.eventClass.cast(readValues.next()));
            } catch (RuntimeException e) {
                JsonMappingException cause = e.getCause();
                if (!(cause instanceof JsonMappingException)) {
                    if (cause instanceof IOException) {
                        throw ((IOException) cause);
                    }
                    throw e;
                }
                this.listener.onMappingException(cause);
            }
        }
        return arrayList;
    }

    @Override // org.zalando.fahrschein.IORunnable
    public void run() throws IOException {
        try {
            runInternal();
        } catch (BackoffException e) {
            throw e.getCause();
        }
    }

    @VisibleForTesting
    void runInternal() throws IOException, BackoffException {
        LOG.info("Starting to listen for events for [{}]", this.eventName);
        JsonInput openJsonInput = openJsonInput();
        JsonParser jsonParser = openJsonInput.getJsonParser();
        int i = 0;
        while (true) {
            try {
                readBatch(jsonParser);
                i = 0;
            } catch (IOException e) {
                this.metricsCollector.markErrorWhileConsuming();
                if (i > 0) {
                    LOG.warn("Got [{}] [{}] while reading events for [{}] after [{}] retries", new Object[]{e.getClass().getSimpleName(), e.getMessage(), this.eventName, Integer.valueOf(i), e});
                } else {
                    LOG.info("Got [{}] [{}] while reading events for [{}]", new Object[]{e.getClass().getSimpleName(), e.getMessage(), this.eventName, e});
                }
                openJsonInput.close();
                if (Thread.currentThread().isInterrupted()) {
                    LOG.warn("Thread was interrupted");
                    return;
                }
                try {
                    LOG.debug("Reconnecting after [{}] errors", Integer.valueOf(i));
                    openJsonInput = (JsonInput) this.backoffStrategy.call(i, e, this::openJsonInput);
                    jsonParser = openJsonInput.getJsonParser();
                    LOG.info("Reconnected after [{}] errors", Integer.valueOf(i));
                    this.metricsCollector.markReconnection();
                    i++;
                } catch (InterruptedException e2) {
                    LOG.warn("Interrupted during reconnection", e2);
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        }
    }

    @VisibleForTesting
    void readSingleBatch() throws IOException {
        try {
            JsonInput openJsonInput = openJsonInput();
            Throwable th = null;
            try {
                readBatch(openJsonInput.getJsonParser());
                if (openJsonInput != null) {
                    if (0 != 0) {
                        try {
                            openJsonInput.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        openJsonInput.close();
                    }
                }
            } finally {
            }
        } catch (IOException e) {
            this.metricsCollector.markErrorWhileConsuming();
            throw e;
        }
    }

    /* JADX WARN: Removed duplicated region for block: B:17:0x00b1 A[SYNTHETIC] */
    /* JADX WARN: Removed duplicated region for block: B:21:0x00ba A[SYNTHETIC] */
    /* JADX WARN: Removed duplicated region for block: B:24:0x00d1 A[SYNTHETIC] */
    /* JADX WARN: Removed duplicated region for block: B:27:0x00a8 A[SYNTHETIC] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void readBatch(com.fasterxml.jackson.core.JsonParser r8) throws java.io.IOException {
        /*
            Method dump skipped, instructions count: 345
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.zalando.fahrschein.NakadiReader.readBatch(com.fasterxml.jackson.core.JsonParser):void");
    }

    private void expectToken(JsonParser jsonParser, JsonToken jsonToken) throws IOException {
        JsonToken nextToken = jsonParser.nextToken();
        if (nextToken == null) {
            if (!Thread.currentThread().isInterrupted()) {
                throw new EOFException("Stream was closed");
            }
            throw new InterruptedIOException("Thread was interrupted");
        }
        if (nextToken != jsonToken) {
            throw new IOException(String.format("Expected [%s] but got [%s]", jsonToken, nextToken));
        }
    }
}
