package org.commonjava.couch.change;

import com.google.gson.annotations.SerializedName;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.MalformedURLException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import javax.inject.Inject;
import javax.inject.Singleton;
import org.apache.http.Header;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.commonjava.couch.change.dispatch.CouchChangeDispatcher;
import org.commonjava.couch.conf.CouchDBConfiguration;
import org.commonjava.couch.db.CouchDBException;
import org.commonjava.couch.db.CouchManager;
import org.commonjava.couch.io.CouchHttpClient;
import org.commonjava.couch.io.Serializer;
import org.commonjava.couch.model.AbstractCouchDocument;
import org.commonjava.couch.model.CouchDocRef;
import org.commonjava.couch.util.UrlUtils;
import org.commonjava.util.logging.Logger;

@Singleton
/* loaded from: input_file:org/commonjava/couch/change/CouchChangeListener.class */
public class CouchChangeListener implements Runnable {
    private static final String CHANGE_LISTENER_DOCID = "change-listener-metadata";
    private static final String CHANGES_SERVICE = "_changes";

    @Inject
    private CouchChangeDispatcher dispatcher;

    @Inject
    private CouchDBConfiguration config;

    @Inject
    private CouchHttpClient http;

    @Inject
    private CouchManager couch;

    @Inject
    private Serializer serializer;
    private ChangeListenerMetadata metadata;
    private Thread listenerThread;
    private final Logger logger = new Logger(getClass());
    private boolean running = false;
    private final Object internalLock = new Object();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/commonjava/couch/change/CouchChangeListener$ChangeListenerMetadata.class */
    public static final class ChangeListenerMetadata extends AbstractCouchDocument {

        @SerializedName("last_seq")
        private int lastProcessedSequenceId;

        ChangeListenerMetadata() {
            setCouchDocId(CouchChangeListener.CHANGE_LISTENER_DOCID);
        }

        public Map<String, String> getUrlParameters() {
            HashMap hashMap = new HashMap();
            if (this.lastProcessedSequenceId > 0) {
                hashMap.put("since", Integer.toString(this.lastProcessedSequenceId));
            }
            return hashMap;
        }

        int getLastProcessedSequenceId() {
            return this.lastProcessedSequenceId;
        }

        void setLastProcessedSequenceId(int i) {
            this.lastProcessedSequenceId = i;
        }
    }

    public CouchChangeListener() {
    }

    public CouchChangeListener(CouchChangeDispatcher couchChangeDispatcher, CouchHttpClient couchHttpClient, CouchDBConfiguration couchDBConfiguration, CouchManager couchManager, Serializer serializer) {
        this.dispatcher = couchChangeDispatcher;
        this.http = couchHttpClient;
        this.config = couchDBConfiguration;
        this.couch = couchManager;
        this.serializer = serializer;
    }

    public void startup() throws CouchDBException {
        startup(true);
    }

    public void startup(boolean z) throws CouchDBException {
        this.metadata = (ChangeListenerMetadata) this.couch.getDocument(new CouchDocRef(CHANGE_LISTENER_DOCID), ChangeListenerMetadata.class);
        if (this.metadata == null) {
            this.metadata = new ChangeListenerMetadata();
        }
        this.listenerThread = new Thread(this);
        this.listenerThread.setDaemon(true);
        this.listenerThread.start();
        if (z) {
            synchronized (this.internalLock) {
                while (!this.running) {
                    this.logger.info("Waiting for change listener to startup...", new Object[0]);
                    try {
                        this.internalLock.wait(100L);
                    } catch (InterruptedException e) {
                        this.logger.info("Interrupted...", new Object[0]);
                    }
                }
            }
        }
    }

    public void shutdown() throws CouchDBException {
        if (this.listenerThread != null) {
            this.listenerThread.interrupt();
            while (this.listenerThread.isAlive()) {
                this.logger.info("Waiting for change-listener shutdown...", new Object[0]);
                synchronized (this.internalLock) {
                    try {
                        this.internalLock.wait(2000L);
                    } catch (InterruptedException e) {
                    }
                }
            }
        }
        if (this.metadata != null && this.metadata.getLastProcessedSequenceId() < 1) {
            this.couch.store(this.metadata, false);
        }
        this.running = false;
        synchronized (this) {
            notifyAll();
        }
    }

    public boolean isRunning() {
        return this.running;
    }

    @Override // java.lang.Runnable
    public void run() {
        CouchDocChangeDeserializer couchDocChangeDeserializer = new CouchDocChangeDeserializer();
        while (true) {
            if (Thread.interrupted()) {
                break;
            }
            try {
                HttpGet httpGet = new HttpGet(UrlUtils.buildUrl(this.config.getDatabaseUrl(), this.metadata.getUrlParameters(), CHANGES_SERVICE));
                try {
                    try {
                        try {
                            HttpResponse executeHttpWithResponse = this.http.executeHttpWithResponse(httpGet, "Failed to open changes stream.");
                            if (executeHttpWithResponse.getEntity() == null) {
                                this.logger.error("Changes stream did not return a response body.", new Object[0]);
                                this.http.cleanup(httpGet);
                                break;
                            }
                            Header contentEncoding = executeHttpWithResponse.getEntity().getContentEncoding();
                            String value = contentEncoding == null ? "UTF-8" : contentEncoding.getValue();
                            InputStream content = executeHttpWithResponse.getEntity().getContent();
                            this.running = true;
                            synchronized (this.internalLock) {
                                this.internalLock.notifyAll();
                            }
                            Iterator<CouchDocChange> it = ((CouchDocChangeList) this.serializer.fromJson(content, value, CouchDocChangeList.class, couchDocChangeDeserializer)).iterator();
                            while (it.hasNext()) {
                                CouchDocChange next = it.next();
                                if (!next.getId().equals(CHANGE_LISTENER_DOCID)) {
                                    this.metadata.setLastProcessedSequenceId(next.getSequence());
                                    this.dispatcher.documentChanged(next);
                                }
                            }
                            this.http.cleanup(httpGet);
                            try {
                                Thread.sleep(2000L);
                            } catch (InterruptedException e) {
                            }
                        } catch (Throwable th) {
                            this.http.cleanup(httpGet);
                            throw th;
                        }
                    } catch (UnsupportedEncodingException e2) {
                        this.logger.error("Invalid content encoding for changes response: %s. Reason: %s", e2, new Object[]{null, e2.getMessage()});
                        this.http.cleanup(httpGet);
                    }
                } catch (IOException e3) {
                    this.logger.error("Error reading changes response content. Reason: %s", e3, new Object[]{e3.getMessage()});
                    this.http.cleanup(httpGet);
                } catch (CouchDBException e4) {
                    this.logger.error("Failed to read changes stream for db: %s. Reason: %s", e4, new Object[]{this.config.getDatabaseUrl(), e4.getMessage()});
                    this.http.cleanup(httpGet);
                }
            } catch (MalformedURLException e5) {
                this.logger.error("Failed to construct changes URL for db: %s. Reason: %s", e5, new Object[]{this.config.getDatabaseUrl(), e5.getMessage()});
            }
        }
        synchronized (this.internalLock) {
            this.internalLock.notifyAll();
        }
    }
}
