package org.zalando.fahrschein;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.RuntimeJsonMappingException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpMethod;
import org.springframework.http.client.ClientHttpRequest;
import org.springframework.http.client.ClientHttpRequestFactory;
import org.springframework.http.client.ClientHttpResponse;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.zalando.fahrschein.domain.Batch;
import org.zalando.fahrschein.domain.Cursor;
import org.zalando.fahrschein.domain.Subscription;

/* loaded from: input_file:org/zalando/fahrschein/NakadiReader.class */
public class NakadiReader<T> {
    private static final Logger LOG = LoggerFactory.getLogger(NakadiReader.class);
    private final URI uri;
    private final ClientHttpRequestFactory clientHttpRequestFactory;
    private final BackoffStrategy backoffStrategy;
    private final CursorManager cursorManager;
    private final ObjectMapper objectMapper;
    private final String eventName;
    private final Optional<Subscription> subscription;
    private final Class<T> eventClass;
    private final Listener<T> listener;
    private final JsonFactory jsonFactory;
    private final ObjectReader eventReader;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/zalando/fahrschein/NakadiReader$JsonInput.class */
    public static class JsonInput implements Closeable {
        private final ClientHttpResponse response;
        private final JsonParser jsonParser;

        JsonInput(ClientHttpResponse clientHttpResponse, JsonParser jsonParser) {
            this.response = clientHttpResponse;
            this.jsonParser = jsonParser;
        }

        ClientHttpResponse getResponse() {
            return this.response;
        }

        JsonParser getJsonParser() {
            return this.jsonParser;
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            try {
                try {
                    NakadiReader.LOG.trace("Trying to close json parser");
                    this.jsonParser.close();
                    NakadiReader.LOG.trace("Closed json parser");
                    NakadiReader.LOG.trace("Trying to close response");
                    this.response.close();
                    NakadiReader.LOG.trace("Closed response");
                } catch (IOException e) {
                    NakadiReader.LOG.warn("Could not close json parser", e);
                    NakadiReader.LOG.trace("Trying to close response");
                    this.response.close();
                    NakadiReader.LOG.trace("Closed response");
                }
            } catch (Throwable th) {
                NakadiReader.LOG.trace("Trying to close response");
                this.response.close();
                NakadiReader.LOG.trace("Closed response");
                throw th;
            }
        }
    }

    public NakadiReader(URI uri, ClientHttpRequestFactory clientHttpRequestFactory, BackoffStrategy backoffStrategy, CursorManager cursorManager, ObjectMapper objectMapper, String str, Optional<Subscription> optional, Class<T> cls, Listener<T> listener) {
        Preconditions.checkState(!optional.isPresent() || str.equals(Iterables.getOnlyElement(optional.get().getEventTypes())), "Only subscriptions to single event types are currently supported");
        this.uri = uri;
        this.clientHttpRequestFactory = clientHttpRequestFactory;
        this.backoffStrategy = backoffStrategy;
        this.cursorManager = cursorManager;
        this.objectMapper = objectMapper;
        this.eventName = str;
        this.subscription = optional;
        this.eventClass = cls;
        this.listener = listener;
        this.jsonFactory = this.objectMapper.getFactory();
        this.eventReader = this.objectMapper.reader().forType(cls);
        if (clientHttpRequestFactory instanceof HttpComponentsClientHttpRequestFactory) {
            LOG.warn("Using [{}] might block during reconnection, please consider using another implementation of ClientHttpRequestFactory", clientHttpRequestFactory.getClass().getName());
        }
    }

    private JsonInput openJsonInput() throws IOException {
        ClientHttpRequest createRequest = this.clientHttpRequestFactory.createRequest(this.uri, HttpMethod.GET);
        if (!this.subscription.isPresent()) {
            Collection<Cursor> cursors = this.cursorManager.getCursors(this.eventName);
            if (!cursors.isEmpty()) {
                createRequest.getHeaders().put("X-Nakadi-Cursors", Collections.singletonList(this.objectMapper.writeValueAsString(cursors)));
            }
        }
        ClientHttpResponse execute = createRequest.execute();
        return new JsonInput(execute, this.jsonFactory.createParser(execute.getBody()).disable(JsonParser.Feature.AUTO_CLOSE_SOURCE));
    }

    private void processBatch(Batch<T> batch) throws IOException {
        Cursor cursor = batch.getCursor();
        try {
            this.listener.accept(batch.getEvents());
            this.cursorManager.onSuccess(this.eventName, cursor);
        } catch (EventAlreadyProcessedException e) {
            LOG.info("Events for [{}] partition [{}] at offset [{}] were already processed", new Object[]{this.eventName, cursor.getPartition(), cursor.getOffset()});
        } catch (Throwable th) {
            this.cursorManager.onError(this.eventName, cursor, th);
            throw th;
        }
    }

    /* JADX WARN: Removed duplicated region for block: B:14:0x0084 A[SYNTHETIC] */
    /* JADX WARN: Removed duplicated region for block: B:18:0x008c A[SYNTHETIC] */
    /* JADX WARN: Removed duplicated region for block: B:21:0x007c A[SYNTHETIC] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private org.zalando.fahrschein.domain.Cursor readCursor(com.fasterxml.jackson.core.JsonParser r6) throws java.io.IOException {
        /*
            r5 = this;
            r0 = 0
            r7 = r0
            r0 = 0
            r8 = r0
            r0 = r5
            r1 = r6
            com.fasterxml.jackson.core.JsonToken r2 = com.fasterxml.jackson.core.JsonToken.START_OBJECT
            r0.expectToken(r1, r2)
        Lc:
            r0 = r6
            com.fasterxml.jackson.core.JsonToken r0 = r0.nextToken()
            com.fasterxml.jackson.core.JsonToken r1 = com.fasterxml.jackson.core.JsonToken.END_OBJECT
            if (r0 == r1) goto La5
            r0 = r6
            java.lang.String r0 = r0.getCurrentName()
            r9 = r0
            r0 = r9
            r10 = r0
            r0 = -1
            r11 = r0
            r0 = r10
            int r0 = r0.hashCode()
            switch(r0) {
                case -1799810326: goto L44;
                case -1019779949: goto L54;
                default: goto L61;
            }
        L44:
            r0 = r10
            java.lang.String r1 = "partition"
            boolean r0 = r0.equals(r1)
            if (r0 == 0) goto L61
            r0 = 0
            r11 = r0
            goto L61
        L54:
            r0 = r10
            java.lang.String r1 = "offset"
            boolean r0 = r0.equals(r1)
            if (r0 == 0) goto L61
            r0 = 1
            r11 = r0
        L61:
            r0 = r11
            switch(r0) {
                case 0: goto L7c;
                case 1: goto L84;
                default: goto L8c;
            }
        L7c:
            r0 = r6
            java.lang.String r0 = r0.nextTextValue()
            r7 = r0
            goto La2
        L84:
            r0 = r6
            java.lang.String r0 = r0.nextTextValue()
            r8 = r0
            goto La2
        L8c:
            org.slf4j.Logger r0 = org.zalando.fahrschein.NakadiReader.LOG
            java.lang.String r1 = "Unexpected field [{}] in cursor"
            r2 = r9
            r0.warn(r1, r2)
            r0 = r6
            com.fasterxml.jackson.core.JsonToken r0 = r0.nextToken()
            r0 = r6
            com.fasterxml.jackson.core.JsonParser r0 = r0.skipChildren()
        La2:
            goto Lc
        La5:
            r0 = r7
            if (r0 != 0) goto Lb3
            java.lang.IllegalStateException r0 = new java.lang.IllegalStateException
            r1 = r0
            java.lang.String r2 = "Could not read partition from cursor"
            r1.<init>(r2)
            throw r0
        Lb3:
            r0 = r8
            if (r0 != 0) goto Ld7
            java.lang.IllegalStateException r0 = new java.lang.IllegalStateException
            r1 = r0
            java.lang.StringBuilder r2 = new java.lang.StringBuilder
            r3 = r2
            r3.<init>()
            java.lang.String r3 = "Could not read offset from cursor for partition ["
            java.lang.StringBuilder r2 = r2.append(r3)
            r3 = r7
            java.lang.StringBuilder r2 = r2.append(r3)
            java.lang.String r3 = "]"
            java.lang.StringBuilder r2 = r2.append(r3)
            java.lang.String r2 = r2.toString()
            r1.<init>(r2)
            throw r0
        Ld7:
            org.zalando.fahrschein.domain.Cursor r0 = new org.zalando.fahrschein.domain.Cursor
            r1 = r0
            r2 = r7
            r3 = r8
            r1.<init>(r2, r3)
            return r0
        */
        throw new UnsupportedOperationException("Method not decompiled: org.zalando.fahrschein.NakadiReader.readCursor(com.fasterxml.jackson.core.JsonParser):org.zalando.fahrschein.domain.Cursor");
    }

    private List<T> readEvents(JsonParser jsonParser) throws IOException {
        expectToken(jsonParser, JsonToken.START_ARRAY);
        jsonParser.clearCurrentToken();
        Iterator<T> readValues = this.eventReader.readValues(jsonParser, this.eventClass);
        ArrayList arrayList = new ArrayList();
        while (readValues.hasNext()) {
            readEvent(readValues, arrayList);
        }
        return arrayList;
    }

    private void readEvent(Iterator<T> it, List<T> list) throws JsonMappingException {
        try {
            list.add(it.next());
        } catch (RuntimeJsonMappingException e) {
            JsonMappingException cause = e.getCause();
            if (!(cause instanceof JsonMappingException)) {
                throw e;
            }
            this.listener.onMappingException(cause);
        }
    }

    public void run() throws IOException {
        run(-1L, TimeUnit.MILLISECONDS);
    }

    public void run(long j, TimeUnit timeUnit) throws IOException {
        try {
            runInternal(j, timeUnit);
        } catch (BackoffException e) {
            throw e.getCause();
        }
    }

    @VisibleForTesting
    void runInternal(long j, TimeUnit timeUnit) throws IOException, BackoffException {
        long currentTimeMillis = j <= 0 ? Long.MAX_VALUE : System.currentTimeMillis() + timeUnit.toMillis(j);
        LOG.info("Starting to listen for events for [{}]", this.eventName);
        JsonInput openJsonInput = openJsonInput();
        JsonParser jsonParser = openJsonInput.getJsonParser();
        int i = 0;
        while (System.currentTimeMillis() < currentTimeMillis) {
            try {
                LOG.debug("Waiting for next batch of events for [{}]", this.eventName);
                expectToken(jsonParser, JsonToken.START_OBJECT);
                expectToken(jsonParser, JsonToken.FIELD_NAME);
                expectField(jsonParser, "cursor");
                Cursor readCursor = readCursor(jsonParser);
                LOG.debug("Cursor for [{}] partition [{}] at offset [{}]", new Object[]{this.eventName, readCursor.getPartition(), readCursor.getOffset()});
                if (jsonParser.nextToken() != JsonToken.END_OBJECT) {
                    expectField(jsonParser, "events");
                    List<T> readEvents = readEvents(jsonParser);
                    expectToken(jsonParser, JsonToken.END_OBJECT);
                    processBatch(new Batch<>(readCursor, Collections.unmodifiableList(readEvents)));
                }
                i = 0;
            } catch (IOException e) {
                if (i > 0) {
                    LOG.warn("Got [{}] while reading events for [{}] after [{}] retries", new Object[]{e.getClass().getSimpleName(), this.eventName, Integer.valueOf(i), e});
                } else {
                    LOG.info("Got [{}] while reading events for [{}]", e.getClass().getSimpleName(), this.eventName);
                }
                openJsonInput.close();
                if (Thread.currentThread().isInterrupted()) {
                    LOG.warn("Thread was interrupted");
                    return;
                }
                try {
                    LOG.debug("Reconnecting after [{}] errors", Integer.valueOf(i));
                    openJsonInput = (JsonInput) this.backoffStrategy.call(i, e, this::openJsonInput);
                    jsonParser = openJsonInput.getJsonParser();
                    LOG.info("Reconnected after [{}] errors", Integer.valueOf(i));
                    i++;
                } catch (InterruptedException e2) {
                    LOG.warn("Interrupted during reconnection", e2);
                    Thread.currentThread().interrupt();
                    return;
                }
            }
        }
    }

    private void expectField(JsonParser jsonParser, String str) throws IOException {
        String currentName = jsonParser.getCurrentName();
        Preconditions.checkState(str.equals(currentName), "Expected [%s] field but got [%s]", new Object[]{str, currentName});
    }

    private void expectToken(JsonParser jsonParser, JsonToken jsonToken) throws IOException {
        JsonToken nextToken = jsonParser.nextToken();
        if (nextToken == null) {
            throw new EOFException("Stream was closed");
        }
        Preconditions.checkState(nextToken == jsonToken, "Expected [%s] but got [%s]", new Object[]{jsonToken, nextToken});
    }
}
