package com.github.brainlag.nsq;

import com.github.brainlag.nsq.callbacks.NSQErrorCallback;
import com.github.brainlag.nsq.callbacks.NSQMessageCallback;
import com.github.brainlag.nsq.exceptions.NoConnectionsException;
import com.github.brainlag.nsq.frames.ErrorFrame;
import com.github.brainlag.nsq.frames.NSQFrame;
import com.github.brainlag.nsq.lookup.NSQLookup;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.Closeable;
import java.io.IOException;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.logging.log4j.LogManager;

/* loaded from: input_file:com/github/brainlag/nsq/NSQConsumer.class */
public class NSQConsumer implements Closeable {
    private final NSQLookup lookup;
    private final String topic;
    private final String channel;
    private final NSQMessageCallback callback;
    private final NSQErrorCallback errorCallback;
    private final NSQConfig config;
    private volatile long nextTimeout;
    private final Map<ServerAddress, Connection> connections;
    private final AtomicLong totalMessages;
    private boolean started;
    private int messagesPerBatch;
    private long lookupPeriod;
    private ScheduledExecutorService scheduler;
    private ExecutorService executor;
    private Optional<ScheduledFuture<?>> timeout;

    public NSQConsumer(NSQLookup nSQLookup, String str, String str2, NSQMessageCallback nSQMessageCallback) {
        this(nSQLookup, str, str2, nSQMessageCallback, new NSQConfig());
    }

    public NSQConsumer(NSQLookup nSQLookup, String str, String str2, NSQMessageCallback nSQMessageCallback, NSQConfig nSQConfig) {
        this(nSQLookup, str, str2, nSQMessageCallback, nSQConfig, null);
    }

    public NSQConsumer(NSQLookup nSQLookup, String str, String str2, NSQMessageCallback nSQMessageCallback, NSQConfig nSQConfig, NSQErrorCallback nSQErrorCallback) {
        this.nextTimeout = 0L;
        this.connections = Maps.newHashMap();
        this.totalMessages = new AtomicLong(0L);
        this.started = false;
        this.messagesPerBatch = 200;
        this.lookupPeriod = 60000L;
        this.scheduler = Executors.newSingleThreadScheduledExecutor();
        this.executor = Executors.newCachedThreadPool();
        this.timeout = Optional.empty();
        this.lookup = nSQLookup;
        this.topic = str;
        this.channel = str2;
        this.config = nSQConfig;
        this.callback = nSQMessageCallback;
        this.errorCallback = nSQErrorCallback;
    }

    public NSQConsumer start() {
        if (!this.started) {
            this.started = true;
            connect();
            this.scheduler.scheduleAtFixedRate(() -> {
                connect();
            }, this.lookupPeriod, this.lookupPeriod, TimeUnit.MILLISECONDS);
        }
        return this;
    }

    private Connection createConnection(ServerAddress serverAddress) {
        try {
            Connection connection = new Connection(serverAddress, this.config);
            connection.setConsumer(this);
            connection.setErrorCallback(this.errorCallback);
            connection.command(NSQCommand.instance("SUB " + this.topic + " " + this.channel));
            connection.command(NSQCommand.instance("RDY " + this.messagesPerBatch));
            return connection;
        } catch (NoConnectionsException e) {
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void processMessage(NSQMessage nSQMessage) {
        if (this.callback == null) {
            LogManager.getLogger(this).warn("NO Callback, dropping message: " + nSQMessage);
        } else {
            try {
                this.executor.execute(() -> {
                    this.callback.message(nSQMessage);
                });
                if (this.nextTimeout > 0) {
                    updateTimeout(nSQMessage, -500L);
                }
            } catch (RejectedExecutionException e) {
                LogManager.getLogger(this).trace("Backing off");
                nSQMessage.requeue();
                updateTimeout(nSQMessage, 500L);
            }
        }
        if (this.totalMessages.incrementAndGet() % this.messagesPerBatch > this.messagesPerBatch / 2) {
            rdy(nSQMessage, this.messagesPerBatch);
        }
    }

    private void updateTimeout(NSQMessage nSQMessage, long j) {
        rdy(nSQMessage, 0);
        LogManager.getLogger(this).trace("RDY 0! Halt Flow.");
        if (this.timeout.isPresent()) {
            this.timeout.get().cancel(true);
        }
        if (calculateTimeoutDate(j) != null) {
            this.timeout = Optional.of(this.scheduler.schedule(() -> {
                rdy(nSQMessage, 1);
            }, 0L, TimeUnit.MILLISECONDS));
        }
    }

    private void rdy(NSQMessage nSQMessage, int i) {
        nSQMessage.getConnection().command(NSQCommand.instance("RDY " + i));
    }

    private Date calculateTimeoutDate(long j) {
        if ((System.currentTimeMillis() - this.nextTimeout) + j > 50) {
            this.nextTimeout += j;
            return new Date();
        }
        this.nextTimeout = 0L;
        return null;
    }

    public void shutdown() {
        this.scheduler.shutdown();
        cleanClose();
    }

    private void cleanClose() {
        NSQCommand instance = NSQCommand.instance("CLS");
        try {
            Iterator<Connection> it = this.connections.values().iterator();
            while (it.hasNext()) {
                NSQFrame commandAndWait = it.next().commandAndWait(instance);
                if (commandAndWait instanceof ErrorFrame) {
                    String errorMessage = ((ErrorFrame) commandAndWait).getErrorMessage();
                    if (errorMessage.startsWith("E_INVALID")) {
                        throw new IllegalStateException(errorMessage);
                    }
                }
            }
        } catch (TimeoutException e) {
            LogManager.getLogger(this).warn("No clean disconnect", e);
        }
    }

    public NSQConsumer setMessagesPerBatch(int i) {
        if (!this.started) {
            this.messagesPerBatch = i;
        }
        return this;
    }

    public NSQConsumer setLookupPeriod(long j) {
        if (!this.started) {
            this.lookupPeriod = j;
        }
        return this;
    }

    private void connect() {
        Iterator<Map.Entry<ServerAddress, Connection>> it = this.connections.entrySet().iterator();
        while (it.hasNext()) {
            if (!it.next().getValue().isConnected()) {
                it.remove();
            }
        }
        Set<ServerAddress> lookupAddresses = lookupAddresses();
        Set<ServerAddress> keySet = this.connections.keySet();
        LogManager.getLogger(this).debug("Addresses NSQ connected to: " + lookupAddresses);
        if (lookupAddresses.isEmpty()) {
            LogManager.getLogger(this).warn("No NSQLookup server connections or topic does not exist.");
            return;
        }
        Iterator it2 = Sets.difference(keySet, lookupAddresses).iterator();
        while (it2.hasNext()) {
            ServerAddress serverAddress = (ServerAddress) it2.next();
            LogManager.getLogger(this).info("Remove connection " + serverAddress.toString());
            this.connections.get(serverAddress).close();
            this.connections.remove(serverAddress);
        }
        Iterator it3 = Sets.difference(lookupAddresses, keySet).iterator();
        while (it3.hasNext()) {
            ServerAddress serverAddress2 = (ServerAddress) it3.next();
            if (!this.connections.containsKey(serverAddress2)) {
                this.connections.put(serverAddress2, createConnection(serverAddress2));
            }
        }
    }

    public long getTotalMessages() {
        return this.totalMessages.get();
    }

    public NSQConsumer setExecutor(ExecutorService executorService) {
        if (!this.started) {
            this.executor = executorService;
        }
        return this;
    }

    private Set<ServerAddress> lookupAddresses() {
        return this.lookup.lookup(this.topic);
    }

    public ScheduledFuture scheduleRun(Runnable runnable, int i, int i2, TimeUnit timeUnit) {
        return this.scheduler.scheduleAtFixedRate(runnable, i, i2, timeUnit);
    }

    public NSQConsumer setScheduledExecutor(ScheduledExecutorService scheduledExecutorService) {
        if (!this.started) {
            this.scheduler = scheduledExecutorService;
        }
        return this;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        shutdown();
    }
}
