package io.axual.client.producer.generic;

import io.axual.client.config.BaseProducerConfig;
import io.axual.common.config.ClientConfig;
import java.time.Duration;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/axual/client/producer/generic/ProducerWorkerManager.class */
public final class ProducerWorkerManager implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(ProducerWorkerManager.class);
    private static final Duration JOIN_TIMOUT = Duration.ofSeconds(10);
    private final Map<GenericProducer, ProducerWorker> registry = new ConcurrentHashMap();
    private final AtomicBoolean isClosed = new AtomicBoolean(false);

    public synchronized <K, V> ProducerWorker<K, V> claimWorker(ClientConfig clientConfig, GenericProducer<K, V> genericProducer, BaseProducerConfig<K, V> baseProducerConfig) {
        if (this.isClosed.get()) {
            throw new IllegalStateException("The ProducerWorkerManager is already closed");
        }
        if (LOG.isInfoEnabled()) {
            LOG.info(String.format("Claiming worker\tProducer                 : %s\tSink                     : %s\tDeliveryStrategy         : %s\tOrderingStrategy         : %s\tMessageBufferSize        : %d\tmessageBufferWaitTimeout : %d\tblockedMessageInsert     : %s", genericProducer, baseProducerConfig, baseProducerConfig.getDeliveryStrategy(), baseProducerConfig.getOrderingStrategy(), Integer.valueOf(baseProducerConfig.getMessageBufferSize()), Integer.valueOf(baseProducerConfig.getMessageBufferWaitTimeout()), Boolean.valueOf(baseProducerConfig.isBlocking())));
        }
        if (this.registry.containsKey(genericProducer)) {
            LOG.debug("Producer found in registry");
            return this.registry.get(genericProducer);
        }
        LOG.debug("Producer not found in registry, starting new ProducerWorker");
        ProducerWorker<K, V> producerWorker = new ProducerWorker<>(clientConfig, baseProducerConfig);
        producerWorker.start();
        this.registry.put(genericProducer, producerWorker);
        return producerWorker;
    }

    public synchronized <K, V> void releaseWorker(GenericProducer<K, V> genericProducer) {
        LOG.info("Releasing producer {}", genericProducer);
        if (!this.registry.containsKey(genericProducer)) {
            LOG.debug("Producer not found in registry");
            return;
        }
        LOG.debug("Send ProducerWorker cancel command");
        stopWorker(this.registry.get(genericProducer));
        this.registry.remove(genericProducer);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (!this.isClosed.compareAndSet(false, true)) {
            LOG.warn("ProducerWorkerManager already closed, ignoring...");
            return;
        }
        LOG.info("Shutting down ProducerWorkerManager, cancelling all workers");
        Iterator<ProducerWorker> it = this.registry.values().iterator();
        while (it.hasNext()) {
            stopWorker(it.next());
        }
    }

    private void stopWorker(ProducerWorker producerWorker) {
        producerWorker.cancel();
        long currentTimeMillis = System.currentTimeMillis();
        try {
            producerWorker.join(JOIN_TIMOUT.toMillis());
        } catch (InterruptedException e) {
            LOG.warn("Interrupted while waiting for ProducerWorker to stop, ignoring");
            Thread.currentThread().interrupt();
        }
        LOG.info("Stopping producer worker took {}", Duration.ofMillis(System.currentTimeMillis() - currentTimeMillis));
    }
}
