package com.urbanairship.connect.client;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.gson.Gson;
import com.ning.http.client.AsyncHttpClient;
import com.ning.http.client.ListenableFuture;
import com.ning.http.client.cookie.Cookie;
import com.ning.http.client.cookie.CookieDecoder;
import com.urbanairship.connect.client.consume.MobileEventStreamBodyConsumer;
import com.urbanairship.connect.client.consume.MobileEventStreamConnectFuture;
import com.urbanairship.connect.client.consume.MobileEventStreamResponseHandler;
import com.urbanairship.connect.client.consume.StatusAndHeaders;
import com.urbanairship.connect.client.model.GsonUtil;
import com.urbanairship.connect.java8.Consumer;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/urbanairship/connect/client/MobileEventStream.class */
public class MobileEventStream implements AutoCloseable {
    public static final String X_UA_APPKEY = "X-UA-Appkey";
    private static final String ACCEPT_HEADER = "application/vnd.urbanairship+x-ndjson; version=3;";
    private final StreamQueryDescriptor descriptor;
    private final AsyncHttpClient client;
    private final Consumer<String> eventConsumer;
    private final String url;
    private final FatalExceptionHandler fatalExceptionHandler;
    private final Object stateLock = new Object();
    private volatile Connection connection = null;
    private volatile CountDownLatch bodyConsumeLatch = null;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private static final Logger log = LogManager.getLogger(MobileEventStream.class);
    private static final Gson GSON = GsonUtil.getGson();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/urbanairship/connect/client/MobileEventStream$Connection.class */
    public static final class Connection {
        private final ListenableFuture<Boolean> future;
        private final MobileEventStreamResponseHandler handler;

        public Connection(ListenableFuture<Boolean> listenableFuture, MobileEventStreamResponseHandler mobileEventStreamResponseHandler) {
            this.future = listenableFuture;
            this.handler = mobileEventStreamResponseHandler;
        }
    }

    public MobileEventStream(StreamQueryDescriptor streamQueryDescriptor, AsyncHttpClient asyncHttpClient, Consumer<String> consumer, String str, FatalExceptionHandler fatalExceptionHandler) {
        this.descriptor = streamQueryDescriptor;
        this.client = asyncHttpClient;
        this.eventConsumer = consumer;
        this.url = str;
        this.fatalExceptionHandler = fatalExceptionHandler;
    }

    public void connect(long j, TimeUnit timeUnit) throws InterruptedException {
        synchronized (this.stateLock) {
            Preconditions.checkState(this.connection == null);
            try {
                this.connection = connect(j, timeUnit, Collections.emptyList());
            } catch (ExecutionException e) {
                throw new RuntimeException("Failure attempting to connect to mobile event stream for app " + getAppKey(), e);
            } catch (TimeoutException e2) {
                throw new RuntimeException("Timed out waiting to establish connection to mobile event stream for app " + getAppKey());
            }
        }
    }

    public void consume(long j, TimeUnit timeUnit) throws InterruptedException {
        synchronized (this.stateLock) {
            Preconditions.checkState((this.connection == null || this.closed.get()) ? false : true);
            this.bodyConsumeLatch = new CountDownLatch(1);
            this.connection.future.addListener(new Runnable() { // from class: com.urbanairship.connect.client.MobileEventStream.1
                @Override // java.lang.Runnable
                public void run() {
                    MobileEventStream.this.bodyConsumeLatch.countDown();
                }
            }, MoreExecutors.directExecutor());
        }
        try {
            this.connection.handler.consumeBody();
            if (!this.bodyConsumeLatch.await(j, timeUnit)) {
                log.debug("Hit max consume time for stream for app " + getAppKey());
            }
            Optional<Throwable> error = this.connection.handler.getError();
            if (error.isPresent()) {
                throw new RuntimeException("Error occurred consuming stream for app " + getAppKey(), error.get());
            }
        } finally {
            cleanup();
        }
    }

    private void cleanup() {
        synchronized (this.stateLock) {
            if (this.closed.compareAndSet(false, true)) {
                if (this.bodyConsumeLatch != null) {
                    this.bodyConsumeLatch.countDown();
                }
                if (this.connection != null) {
                    try {
                        this.connection.handler.stop();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    this.connection.future.done();
                }
            }
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        cleanup();
    }

    private Connection connect(long j, TimeUnit timeUnit, Collection<Cookie> collection) throws InterruptedException, ExecutionException, TimeoutException {
        AsyncHttpClient.BoundRequestBuilder buildRequest = buildRequest(collection);
        MobileEventStreamConnectFuture mobileEventStreamConnectFuture = new MobileEventStreamConnectFuture();
        MobileEventStreamResponseHandler mobileEventStreamResponseHandler = new MobileEventStreamResponseHandler(new MobileEventStreamBodyConsumer(this.eventConsumer), mobileEventStreamConnectFuture);
        ListenableFuture execute = buildRequest.execute(mobileEventStreamResponseHandler);
        StatusAndHeaders statusAndHeaders = mobileEventStreamConnectFuture.get(j, timeUnit);
        int statusCode = statusAndHeaders.getStatusCode();
        if (statusCode == 200) {
            return new Connection(execute, mobileEventStreamResponseHandler);
        }
        mobileEventStreamResponseHandler.stop();
        execute.done();
        if (399 < statusCode && statusCode < 500) {
            this.fatalExceptionHandler.handle(new RuntimeException(String.format("Received status code (%d) from a bad request for app %s", Integer.valueOf(statusCode), getAppKey())));
        }
        if (statusCode != 307) {
            throw new RuntimeException(String.format("Received unexpected status code (%d) from request for stream for app %s", Integer.valueOf(statusCode), getAppKey()));
        }
        return handleRedirect(j, timeUnit, statusAndHeaders);
    }

    private AsyncHttpClient.BoundRequestBuilder buildRequest(Collection<Cookie> collection) {
        byte[] query = getQuery();
        AsyncHttpClient.BoundRequestBuilder addHeader = this.client.preparePost(this.url).addHeader("Accept", ACCEPT_HEADER).addHeader("Content-Length", Integer.toString(query.length));
        for (Map.Entry<String, String> entry : getAuthHeaders(this.descriptor.getCreds()).entrySet()) {
            addHeader.addHeader(entry.getKey(), entry.getValue());
        }
        Iterator<Cookie> it = collection.iterator();
        while (it.hasNext()) {
            addHeader.addCookie(it.next());
        }
        addHeader.setBody(query);
        return addHeader;
    }

    private Connection handleRedirect(long j, TimeUnit timeUnit, StatusAndHeaders statusAndHeaders) throws InterruptedException, ExecutionException, TimeoutException {
        List<String> list = statusAndHeaders.getHeaders().get("Set-Cookie");
        if (list == null || list.isEmpty()) {
            throw new RuntimeException("Received redirect response with no 'Set-Cookie' header in response!");
        }
        String str = list.get(0);
        Cookie decode = CookieDecoder.decode(str);
        if (decode == null) {
            throw new RuntimeException("Received redirect response with unparsable 'Set-Cookie' value - " + str);
        }
        return connect(j, timeUnit, ImmutableList.of(decode));
    }

    private Map<String, String> getAuthHeaders(Creds creds) {
        return ImmutableMap.of("Authorization", "Bearer " + creds.getToken(), X_UA_APPKEY, creds.getAppKey());
    }

    private byte[] getQuery() {
        HashMap hashMap = new HashMap();
        if (!this.descriptor.getOffset().isPresent()) {
            hashMap.put("start", "LATEST");
        } else if (this.descriptor.getOffset().get().equals("EARLIEST") || this.descriptor.getOffset().get().equals("LATEST")) {
            hashMap.put("start", this.descriptor.getOffset().get());
        } else {
            hashMap.put("resume_offset", this.descriptor.getOffset().get());
        }
        if (this.descriptor.getSubset().isPresent()) {
            hashMap.put("subset", this.descriptor.getSubset().get());
        }
        if (this.descriptor.getFilters().isPresent()) {
            hashMap.put("filters", this.descriptor.getFilters().get());
        }
        return GSON.toJson(hashMap).getBytes(StandardCharsets.UTF_8);
    }

    private String getAppKey() {
        return this.descriptor.getCreds().getAppKey();
    }
}
