package org.zalando.fahrschein;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import java.io.Closeable;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.URI;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zalando.fahrschein.domain.Cursor;
import org.zalando.fahrschein.domain.Lock;
import org.zalando.fahrschein.domain.Subscription;
import org.zalando.fahrschein.http.api.Headers;
import org.zalando.fahrschein.http.api.Request;
import org.zalando.fahrschein.http.api.RequestFactory;
import org.zalando.fahrschein.http.api.Response;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/zalando/fahrschein/NakadiReader.class */
public class NakadiReader<T> implements IORunnable {
    private static final Logger LOG = LoggerFactory.getLogger(NakadiReader.class);
    private static final TypeReference<Collection<Cursor>> COLLECTION_OF_CURSORS = new TypeReference<Collection<Cursor>>() { // from class: org.zalando.fahrschein.NakadiReader.1
    };
    private final URI uri;
    private final RequestFactory requestFactory;
    private final BackoffStrategy backoffStrategy;
    private final CursorManager cursorManager;
    private final Set<String> eventNames;
    private final Optional<Subscription> subscription;
    private final Optional<Lock> lock;
    private final EventReader<T> eventReader;
    private final Listener<T> listener;
    private final BatchHandler batchHandler;
    private final JsonFactory jsonFactory;
    private final ObjectWriter cursorHeaderWriter;
    private final MetricsCollector metricsCollector;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/zalando/fahrschein/NakadiReader$Batch.class */
    public static final class Batch<T> {
        private final Cursor cursor;
        private final List<T> events;

        Batch(Cursor cursor, List<T> list) {
            this.cursor = cursor;
            this.events = list;
        }

        Cursor getCursor() {
            return this.cursor;
        }

        List<T> getEvents() {
            return this.events;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/zalando/fahrschein/NakadiReader$JsonInput.class */
    public static class JsonInput implements Closeable {
        private final JsonFactory jsonFactory;
        private final Response response;
        private JsonParser jsonParser;

        JsonInput(JsonFactory jsonFactory, Response response) {
            this.jsonFactory = jsonFactory;
            this.response = response;
        }

        Response getResponse() {
            return this.response;
        }

        JsonParser getJsonParser() throws IOException {
            if (this.jsonParser == null) {
                this.jsonParser = this.jsonFactory.createParser(this.response.getBody()).disable(JsonParser.Feature.AUTO_CLOSE_SOURCE);
            }
            return this.jsonParser;
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            try {
                if (this.jsonParser != null) {
                    try {
                        NakadiReader.LOG.trace("Trying to close json parser");
                        this.jsonParser.close();
                        NakadiReader.LOG.trace("Closed json parser");
                    } catch (IOException e) {
                        NakadiReader.LOG.warn("Could not close json parser", e);
                    }
                }
                NakadiReader.LOG.trace("Trying to close response");
                this.response.close();
                NakadiReader.LOG.trace("Closed response");
            } catch (Throwable th) {
                NakadiReader.LOG.trace("Trying to close response");
                this.response.close();
                NakadiReader.LOG.trace("Closed response");
                throw th;
            }
        }
    }

    NakadiReader(URI uri, RequestFactory requestFactory, BackoffStrategy backoffStrategy, CursorManager cursorManager, ObjectMapper objectMapper, Set<String> set, Optional<Subscription> optional, Optional<Lock> optional2, Class<T> cls, Listener<T> listener) {
        this(uri, requestFactory, backoffStrategy, cursorManager, set, optional, optional2, new MappingEventReader(cls, objectMapper), listener, DefaultBatchHandler.INSTANCE, NoMetricsCollector.NO_METRICS_COLLECTOR);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public NakadiReader(URI uri, RequestFactory requestFactory, BackoffStrategy backoffStrategy, CursorManager cursorManager, Set<String> set, Optional<Subscription> optional, Optional<Lock> optional2, EventReader<T> eventReader, Listener<T> listener, BatchHandler batchHandler, MetricsCollector metricsCollector) {
        Preconditions.checkState(optional.isPresent() || set.size() == 1, "Low level api only supports reading from a single event");
        this.uri = uri;
        this.requestFactory = requestFactory;
        this.backoffStrategy = backoffStrategy;
        this.cursorManager = cursorManager;
        this.eventNames = set;
        this.subscription = optional;
        this.lock = optional2;
        this.eventReader = eventReader;
        this.listener = listener;
        this.batchHandler = batchHandler;
        this.metricsCollector = metricsCollector;
        this.jsonFactory = DefaultObjectMapper.INSTANCE.getFactory();
        this.cursorHeaderWriter = DefaultObjectMapper.INSTANCE.writerFor(COLLECTION_OF_CURSORS);
    }

    private static Optional<String> getStreamId(Response response) {
        Headers headers = response.getHeaders();
        return Optional.ofNullable(headers == null ? null : headers.getFirst("X-Nakadi-StreamId"));
    }

    private JsonInput openJsonInput() throws IOException {
        String cursorsHeader = getCursorsHeader();
        Request createRequest = this.requestFactory.createRequest(this.uri, "GET");
        if (cursorsHeader != null) {
            createRequest.getHeaders().put("X-Nakadi-Cursors", cursorsHeader);
        }
        Response execute = createRequest.execute();
        try {
            Optional<String> streamId = getStreamId(execute);
            if (this.subscription.isPresent() && streamId.isPresent()) {
                this.cursorManager.addStreamId(this.subscription.get(), streamId.get());
            }
            return new JsonInput(this.jsonFactory, execute);
        } catch (Throwable th) {
            try {
                execute.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Nullable
    private String getCursorsHeader() throws IOException {
        if (this.subscription.isPresent()) {
            return null;
        }
        Collection<Cursor> lockedCursors = getLockedCursors();
        if (lockedCursors.isEmpty()) {
            return null;
        }
        return this.cursorHeaderWriter.writeValueAsString(lockedCursors);
    }

    private Collection<Cursor> getLockedCursors() throws IOException {
        Collection<Cursor> cursors = this.cursorManager.getCursors(this.eventNames.iterator().next());
        if (!this.lock.isPresent()) {
            return cursors;
        }
        Map map = (Map) cursors.stream().collect(Collectors.toMap((v0) -> {
            return v0.getPartition();
        }, (v0) -> {
            return v0.getOffset();
        }));
        return (Collection) this.lock.get().getPartitions().stream().map(partition -> {
            return new Cursor(partition.getPartition(), (String) map.getOrDefault(partition.getPartition(), "BEGIN"));
        }).collect(Collectors.toList());
    }

    private String getCurrentEventName(Cursor cursor) {
        String eventType = cursor.getEventType();
        return eventType != null ? eventType : this.eventNames.iterator().next();
    }

    private void processBatch(final Batch<T> batch) throws IOException {
        final Cursor cursor = batch.getCursor();
        final String currentEventName = getCurrentEventName(cursor);
        this.batchHandler.processBatch(new IORunnable() { // from class: org.zalando.fahrschein.NakadiReader.2
            @Override // org.zalando.fahrschein.IORunnable
            public void run() throws IOException {
                try {
                    NakadiReader.this.listener.accept(batch.getEvents());
                    NakadiReader.this.cursorManager.onSuccess(currentEventName, cursor);
                } catch (EventAlreadyProcessedException e) {
                    NakadiReader.LOG.info("Events for [{}] partition [{}] at offset [{}] were already processed", new Object[]{currentEventName, cursor.getPartition(), cursor.getOffset()});
                } catch (Throwable th) {
                    NakadiReader.LOG.warn("Exception while processing events for [{}] on partition [{}] at offset [{}]", new Object[]{currentEventName, cursor.getPartition(), cursor.getOffset(), th});
                    throw th;
                }
            }
        });
    }

    /* JADX WARN: Removed duplicated region for block: B:20:0x00bc A[SYNTHETIC] */
    /* JADX WARN: Removed duplicated region for block: B:24:0x00c4 A[SYNTHETIC] */
    /* JADX WARN: Removed duplicated region for block: B:27:0x00cd A[SYNTHETIC] */
    /* JADX WARN: Removed duplicated region for block: B:30:0x00d6 A[SYNTHETIC] */
    /* JADX WARN: Removed duplicated region for block: B:33:0x00b4 A[SYNTHETIC] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private org.zalando.fahrschein.domain.Cursor readCursor(com.fasterxml.jackson.core.JsonParser r8) throws java.io.IOException {
        /*
            Method dump skipped, instructions count: 303
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.zalando.fahrschein.NakadiReader.readCursor(com.fasterxml.jackson.core.JsonParser):org.zalando.fahrschein.domain.Cursor");
    }

    @Override // org.zalando.fahrschein.IORunnable
    public void run() throws IOException {
        try {
            runInternal();
        } catch (BackoffException e) {
            throw e.getCause();
        }
    }

    void runInternal() throws IOException, BackoffException {
        JsonParser jsonParser;
        LOG.info("Starting to listen for events for {}", this.eventNames);
        JsonInput openJsonInput = openJsonInput();
        int i = 0;
        while (true) {
            try {
                jsonParser = openJsonInput.getJsonParser();
            } catch (IOException e) {
                boolean isInterrupted = Thread.currentThread().isInterrupted();
                this.metricsCollector.markErrorWhileConsuming();
                if (i > 0) {
                    LOG.warn("Got [{}] [{}] while reading events for {} after [{}] retries", new Object[]{e.getClass().getSimpleName(), e.getMessage(), this.eventNames, Integer.valueOf(i), e});
                } else {
                    LOG.info("Got [{}] [{}] while reading events for {}", new Object[]{e.getClass().getSimpleName(), e.getMessage(), this.eventNames, e});
                }
                openJsonInput.close();
                if (isInterrupted || Thread.currentThread().isInterrupted()) {
                    LOG.warn("Thread was interrupted");
                }
                try {
                    LOG.debug("Reconnecting after [{}] errors", Integer.valueOf(i));
                    openJsonInput = (JsonInput) this.backoffStrategy.call(i, e, this::openJsonInput);
                    LOG.info("Reconnected after [{}] errors", Integer.valueOf(i));
                    this.metricsCollector.markReconnection();
                    i++;
                } catch (InterruptedException e2) {
                    LOG.warn("Interrupted during reconnection", e2);
                    Thread.currentThread().interrupt();
                    return;
                }
            } catch (Throwable th) {
                LOG.warn("Got [{}] [{}] while reading events for {}", new Object[]{th.getClass().getSimpleName(), th.getMessage(), this.eventNames, th});
                try {
                    openJsonInput.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th);
                }
                throw th;
            }
            if (Thread.currentThread().isInterrupted()) {
                throw new InterruptedIOException("Interrupted");
                break;
            } else {
                readBatch(jsonParser);
                i = 0;
            }
        }
        LOG.warn("Thread was interrupted");
    }

    void readSingleBatch() throws IOException {
        try {
            JsonInput openJsonInput = openJsonInput();
            try {
                readBatch(openJsonInput.getJsonParser());
                if (openJsonInput != null) {
                    openJsonInput.close();
                }
            } finally {
            }
        } catch (IOException e) {
            this.metricsCollector.markErrorWhileConsuming();
            throw e;
        }
    }

    /* JADX WARN: Removed duplicated region for block: B:17:0x00b1 A[SYNTHETIC] */
    /* JADX WARN: Removed duplicated region for block: B:21:0x00bf A[SYNTHETIC] */
    /* JADX WARN: Removed duplicated region for block: B:24:0x00d6 A[SYNTHETIC] */
    /* JADX WARN: Removed duplicated region for block: B:27:0x00a8 A[SYNTHETIC] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void readBatch(com.fasterxml.jackson.core.JsonParser r8) throws java.io.IOException {
        /*
            Method dump skipped, instructions count: 355
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.zalando.fahrschein.NakadiReader.readBatch(com.fasterxml.jackson.core.JsonParser):void");
    }
}
