package com.github.brainlag.nsq;

import com.github.brainlag.nsq.exceptions.BadMessageException;
import com.github.brainlag.nsq.exceptions.BadTopicException;
import com.github.brainlag.nsq.exceptions.NSQException;
import com.github.brainlag.nsq.exceptions.NoConnectionsException;
import com.github.brainlag.nsq.frames.ErrorFrame;
import com.github.brainlag.nsq.frames.NSQFrame;
import com.github.brainlag.nsq.pool.ConnectionPoolFactory;
import com.google.common.collect.Sets;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
import org.apache.commons.pool2.impl.GenericKeyedObjectPoolConfig;

/* loaded from: input_file:com/github/brainlag/nsq/NSQProducer.class */
public class NSQProducer {
    private GenericKeyedObjectPool<ServerAddress, Connection> pool;
    private Set<ServerAddress> addresses = Sets.newConcurrentHashSet();
    private int roundRobinCount = 0;
    private volatile boolean started = false;
    private ExecutorService executor = Executors.newCachedThreadPool();
    private GenericKeyedObjectPoolConfig poolConfig = null;
    private NSQConfig config = new NSQConfig();
    private int connectionRetries = 5;

    public NSQProducer start() {
        if (!this.started) {
            this.started = true;
            createPool();
        }
        return this;
    }

    private void createPool() {
        if (this.poolConfig == null) {
            this.poolConfig = new GenericKeyedObjectPoolConfig();
            this.poolConfig.setTestOnBorrow(true);
            this.poolConfig.setJmxEnabled(false);
        }
        this.pool = new GenericKeyedObjectPool<>(new ConnectionPoolFactory(this.config), this.poolConfig);
    }

    protected Connection getConnection() throws NoConnectionsException {
        while (0 < this.connectionRetries) {
            ServerAddress[] serverAddressArr = (ServerAddress[]) this.addresses.toArray(new ServerAddress[this.addresses.size()]);
            if (serverAddressArr.length != 0) {
                try {
                    GenericKeyedObjectPool<ServerAddress, Connection> genericKeyedObjectPool = this.pool;
                    int i = this.roundRobinCount;
                    this.roundRobinCount = i + 1;
                    return (Connection) genericKeyedObjectPool.borrowObject(serverAddressArr[i % serverAddressArr.length]);
                } catch (NoSuchElementException e) {
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e2) {
                        throw new NoConnectionsException("Could not acquire a connection to a server", e2);
                    }
                } catch (Exception e3) {
                    throw new NoConnectionsException("Could not acquire a connection to a server", e3);
                }
            }
        }
        throw new IllegalStateException("No server configured for producer");
    }

    public void produceMulti(String str, List<byte[]> list) throws TimeoutException, NSQException {
        if (!this.started) {
            throw new IllegalStateException("Producer must be started before producing messages!");
        }
        if (list == null || list.isEmpty()) {
            return;
        }
        if (list.size() == 1) {
            produce(str, list.get(0));
            return;
        }
        Connection connection = getConnection();
        try {
            NSQCommand instance = NSQCommand.instance("MPUB " + str);
            instance.setData(list);
            NSQFrame commandAndWait = connection.commandAndWait(instance);
            if (commandAndWait instanceof ErrorFrame) {
                String errorMessage = ((ErrorFrame) commandAndWait).getErrorMessage();
                if (errorMessage.startsWith("E_BAD_TOPIC")) {
                    throw new BadTopicException(errorMessage);
                }
                if (errorMessage.startsWith("E_BAD_MESSAGE")) {
                    throw new BadMessageException(errorMessage);
                }
            }
        } finally {
            this.pool.returnObject(connection.getServerAddress(), connection);
        }
    }

    public void produce(String str, byte[] bArr) throws NSQException, TimeoutException {
        if (!this.started) {
            throw new IllegalStateException("Producer must be started before producing messages!");
        }
        Connection connection = getConnection();
        try {
            NSQFrame commandAndWait = connection.commandAndWait(NSQCommand.instance("PUB " + str, bArr));
            if (commandAndWait instanceof ErrorFrame) {
                String errorMessage = ((ErrorFrame) commandAndWait).getErrorMessage();
                if (errorMessage.startsWith("E_BAD_TOPIC")) {
                    throw new BadTopicException(errorMessage);
                }
                if (errorMessage.startsWith("E_BAD_MESSAGE")) {
                    throw new BadMessageException(errorMessage);
                }
            }
        } finally {
            this.pool.returnObject(connection.getServerAddress(), connection);
        }
    }

    public NSQProducer addAddress(String str, int i) {
        this.addresses.add(new ServerAddress(str, i));
        return this;
    }

    public NSQProducer removeAddress(String str, int i) {
        this.addresses.remove(new ServerAddress(str, i));
        return this;
    }

    public NSQProducer setPoolConfig(GenericKeyedObjectPoolConfig genericKeyedObjectPoolConfig) {
        if (!this.started) {
            this.poolConfig = genericKeyedObjectPoolConfig;
        }
        return this;
    }

    public NSQProducer setExecutor(ExecutorService executorService) {
        if (!this.started) {
            this.executor = executorService;
        }
        return this;
    }

    public NSQProducer setConfig(NSQConfig nSQConfig) {
        if (!this.started) {
            this.config = nSQConfig;
        }
        return this;
    }

    protected ExecutorService getExecutor() {
        return this.executor;
    }

    public GenericKeyedObjectPool<ServerAddress, Connection> getPool() {
        return this.pool;
    }

    public void shutdown() {
        this.started = false;
        this.pool.close();
        this.executor.shutdown();
    }
}
