/*
 * Decompiled with CFR 0.152.
 */
package com.urbanairship.connect.client;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import com.ning.http.client.AsyncHttpClient;
import com.urbanairship.connect.client.ConnectClientConfiguration;
import com.urbanairship.connect.client.FatalExceptionHandler;
import com.urbanairship.connect.client.MobileEventStream;
import com.urbanairship.connect.client.RawEventReceiver;
import com.urbanairship.connect.client.StreamQueryDescriptor;
import com.urbanairship.connect.client.StreamSupplier;
import com.urbanairship.connect.client.StreamUtils;
import com.urbanairship.connect.client.model.responses.Event;
import com.urbanairship.connect.client.offsets.OffsetManager;
import com.urbanairship.connect.java8.Consumer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.configuration.Configuration;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public class MobileEventConsumerService
extends AbstractExecutionThreadService {
    private static final Logger log = LogManager.getLogger(MobileEventConsumerService.class);
    private final OffsetManager offsetManager;
    private final ConnectClientConfiguration config;
    private final AtomicBoolean doConsume;
    private final AsyncHttpClient asyncClient;
    private final RawEventReceiver rawEventReceiver;
    private final StreamQueryDescriptor baseStreamQueryDescriptor;
    private final StreamSupplier supplier;
    private final FatalExceptionHandler fatalExceptionHandler;
    private MobileEventStream mobileEventStream;

    public static Builder newBuilder() {
        return new Builder();
    }

    private MobileEventConsumerService(Consumer<Event> consumer, AsyncHttpClient client, OffsetManager offsetManager, StreamQueryDescriptor baseStreamQueryDescriptor, Configuration config, StreamSupplier supplier, FatalExceptionHandler fatalExceptionHandler) {
        this.offsetManager = offsetManager;
        this.config = new ConnectClientConfiguration(config);
        this.asyncClient = client;
        this.doConsume = new AtomicBoolean(true);
        this.rawEventReceiver = new RawEventReceiver(consumer);
        this.baseStreamQueryDescriptor = baseStreamQueryDescriptor;
        this.supplier = supplier;
        this.fatalExceptionHandler = fatalExceptionHandler;
    }

    public AtomicBoolean getDoConsume() {
        return this.doConsume;
    }

    public void run() {
        this.doConsume.set(true);
        this.stream();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private void stream() {
        int consumptionAttempt = 0;
        try {
            while (this.doConsume.get()) {
                boolean connected = false;
                StreamQueryDescriptor descriptor = consumptionAttempt == 0 ? this.baseStreamQueryDescriptor : StreamUtils.buildNewDescriptor(this.baseStreamQueryDescriptor, this.offsetManager);
                try {
                    MobileEventStream newMobileEventStream = this.supplier.get(descriptor, this.asyncClient, this.rawEventReceiver, this.config.mesUrl, this.fatalExceptionHandler);
                    Throwable throwable = null;
                    try {
                        this.mobileEventStream = newMobileEventStream;
                        log.info((Object)("Connecting to stream for app " + this.baseStreamQueryDescriptor.getCreds().getAppKey()));
                        connected = StreamUtils.connectWithRetries(this.mobileEventStream, this.config, this.baseStreamQueryDescriptor.getCreds().getAppKey());
                        if (!connected) {
                            this.fatalExceptionHandler.handle(new RuntimeException("Could not connect to stream for app " + this.baseStreamQueryDescriptor.getCreds().getAppKey()));
                            return;
                        }
                        log.info((Object)("Consuming from stream for app " + this.baseStreamQueryDescriptor.getCreds().getAppKey()));
                        this.mobileEventStream.consume(this.config.maxAppStreamConsumeTime, TimeUnit.MILLISECONDS);
                    }
                    catch (Throwable throwable2) {
                        throwable = throwable2;
                        throw throwable2;
                    }
                    finally {
                        if (newMobileEventStream == null) continue;
                        if (throwable != null) {
                            try {
                                newMobileEventStream.close();
                            }
                            catch (Throwable throwable3) {
                                throwable.addSuppressed(throwable3);
                            }
                            continue;
                        }
                        newMobileEventStream.close();
                    }
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return;
                }
                catch (Throwable throwable) {
                    log.error((Object)("Error encountered while consuming stream for app " + this.baseStreamQueryDescriptor.getCreds().getAppKey()), throwable);
                    log.info((Object)("Ending stream handling for app " + this.baseStreamQueryDescriptor.getCreds().getAppKey()));
                    ++consumptionAttempt;
                    if (!connected) continue;
                    log.debug((Object)("Updating offset for app " + this.baseStreamQueryDescriptor.getCreds().getAppKey()));
                    this.offsetManager.update(this.rawEventReceiver.get());
                }
                finally {
                    log.info((Object)("Ending stream handling for app " + this.baseStreamQueryDescriptor.getCreds().getAppKey()));
                    ++consumptionAttempt;
                    if (!connected) continue;
                    log.debug((Object)("Updating offset for app " + this.baseStreamQueryDescriptor.getCreds().getAppKey()));
                    this.offsetManager.update(this.rawEventReceiver.get());
                }
            }
            return;
        }
        finally {
            this.asyncClient.close();
        }
    }

    public void triggerShutdown() {
        log.info((Object)("Shutting down stream handler for app " + this.baseStreamQueryDescriptor.getCreds().getAppKey()));
        this.doConsume.set(false);
        if (this.mobileEventStream != null) {
            try {
                this.mobileEventStream.close();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    private static class MobileEventStreamSupplier
    implements StreamSupplier {
        private MobileEventStreamSupplier() {
        }

        @Override
        public MobileEventStream get(StreamQueryDescriptor descriptor, AsyncHttpClient client, Consumer<String> eventConsumer, String url, FatalExceptionHandler fatalExceptionHandler) {
            return new MobileEventStream(descriptor, client, eventConsumer, url, fatalExceptionHandler);
        }
    }

    public static final class Builder {
        private Consumer<Event> consumer;
        private AsyncHttpClient client;
        private OffsetManager offsetManager;
        private StreamQueryDescriptor baseStreamQueryDescriptor;
        private Configuration config;
        private StreamSupplier supplier = new MobileEventStreamSupplier();
        private FatalExceptionHandler fatalExceptionHandler;

        private Builder() {
        }

        public Builder setConsumer(Consumer<Event> consumer) {
            this.consumer = consumer;
            return this;
        }

        public Builder setClient(AsyncHttpClient client) {
            this.client = client;
            return this;
        }

        public Builder setOffsetManager(OffsetManager offsetManager) {
            this.offsetManager = offsetManager;
            return this;
        }

        public Builder setBaseStreamQueryDescriptor(StreamQueryDescriptor descriptor) {
            this.baseStreamQueryDescriptor = descriptor;
            return this;
        }

        public Builder setConfig(Configuration config) {
            this.config = config;
            return this;
        }

        @VisibleForTesting
        public Builder setSupplier(StreamSupplier supplier) {
            this.supplier = supplier;
            return this;
        }

        public Builder setFatalExceptionHandler(FatalExceptionHandler handler) {
            this.fatalExceptionHandler = handler;
            return this;
        }

        public MobileEventConsumerService build() {
            return new MobileEventConsumerService(this.consumer, this.client, this.offsetManager, this.baseStreamQueryDescriptor, this.config, this.supplier, this.fatalExceptionHandler);
        }
    }
}

