package io.polaris.concurrent.pool;

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.SleepingWaitStrategy;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import io.polaris.core.concurrent.pool.ConsumerDelegates;
import io.polaris.core.concurrent.pool.ResourceableConsumer;
import io.polaris.core.concurrent.pool.RunnableStatistics;
import io.polaris.core.concurrent.pool.RunnableStatisticsHolder;
import io.polaris.core.concurrent.pool.TransactionConsumer;
import io.polaris.core.log.ILogger;
import io.polaris.core.log.ILoggers;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

/* loaded from: input_file:io/polaris/concurrent/pool/DisruptorPooledExecutor.class */
public class DisruptorPooledExecutor<E> implements RunnableStatisticsHolder {
    private static final ILogger log = ILoggers.of(DisruptorPooledExecutor.class);
    private RunnableStatistics statistics;
    private String name;
    private Disruptor<PooledEvent> disruptor;
    private int errorLimit = -1;
    private boolean openStatistics = false;
    private volatile boolean running = false;
    private List<Consumer<E>> consumers = new ArrayList();
    private int ringBufferSize = 4096;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/polaris/concurrent/pool/DisruptorPooledExecutor$PooledEvent.class */
    public static class PooledEvent<E> {
        private E data;

        PooledEvent() {
        }

        public E getData() {
            return this.data;
        }

        public void setData(E e) {
            this.data = e;
        }
    }

    /* loaded from: input_file:io/polaris/concurrent/pool/DisruptorPooledExecutor$PooledEventFactory.class */
    static class PooledEventFactory<E> implements EventFactory<PooledEvent<E>> {
        PooledEventFactory() {
        }

        /* renamed from: newInstance, reason: merged with bridge method [inline-methods] */
        public PooledEvent<E> m1newInstance() {
            return new PooledEvent<>();
        }
    }

    /* loaded from: input_file:io/polaris/concurrent/pool/DisruptorPooledExecutor$PooledEventHandler.class */
    static class PooledEventHandler<E> implements WorkHandler<PooledEvent<E>> {
        private Consumer<E> consumer;

        public PooledEventHandler(Consumer<E> consumer) {
            this.consumer = consumer;
        }

        public void onEvent(PooledEvent<E> pooledEvent) throws Exception {
            this.consumer.accept(((PooledEvent) pooledEvent).data);
        }
    }

    /* loaded from: input_file:io/polaris/concurrent/pool/DisruptorPooledExecutor$PooledThreadFactory.class */
    static class PooledThreadFactory implements ThreadFactory {
        private static AtomicLong pool = new AtomicLong(0);
        private AtomicLong seq = new AtomicLong(0);
        private long poolId;
        private String name;

        public PooledThreadFactory(String str) {
            this.poolId = 0L;
            this.name = "disruptor-pooled";
            this.poolId = pool.incrementAndGet();
            if (str == null || str.length() <= 0) {
                return;
            }
            this.name = str;
        }

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable);
            thread.setName(this.name + (this.poolId > 0 ? "-" + this.poolId + "-" : "-") + this.seq.incrementAndGet());
            return thread;
        }
    }

    public RunnableStatistics runnableStatistics() {
        return this.statistics;
    }

    public void setRingBufferSize(int i) {
        if (i <= 0) {
            return;
        }
        int i2 = 16;
        while (true) {
            int i3 = i2;
            if (i3 >= i) {
                this.ringBufferSize = i3;
                return;
            }
            i2 = i3 << 1;
        }
    }

    private void addConsumerInner(Consumer<E> consumer) {
        this.consumers.add(consumer);
    }

    public void addConsumer(Consumer<E> consumer) {
        addConsumer(1, consumer);
    }

    public <Resource> void addConsumer(TransactionConsumer<E, Resource> transactionConsumer) {
        addConsumer(1, transactionConsumer);
    }

    public void addConsumer(int i, Consumer<E> consumer) {
        for (int i2 = 0; i2 < i; i2++) {
            addConsumerInner(ConsumerDelegates.createDelegate(this, consumer));
        }
    }

    public <Resource> void addConsumer(int i, TransactionConsumer<E, Resource> transactionConsumer) {
        for (int i2 = 0; i2 < i; i2++) {
            addConsumerInner(ConsumerDelegates.createDelegate(this, transactionConsumer));
        }
    }

    public boolean isExceedErrorLimit() {
        return this.statistics != null && this.statistics.isExceedErrorLimit();
    }

    public void start() {
        if (this.running) {
            throw new IllegalStateException("正在运行中");
        }
        this.running = true;
        if (this.openStatistics || this.errorLimit >= 0) {
            this.statistics = new RunnableStatistics(this.errorLimit);
        }
        int size = this.consumers.size();
        PooledEventHandler[] pooledEventHandlerArr = new PooledEventHandler[size];
        for (int i = 0; i < size; i++) {
            ResourceableConsumer resourceableConsumer = (Consumer) this.consumers.get(i);
            if (resourceableConsumer instanceof ResourceableConsumer) {
                resourceableConsumer.open();
            }
            pooledEventHandlerArr[i] = new PooledEventHandler(resourceableConsumer);
        }
        Disruptor<PooledEvent> disruptor = new Disruptor<>(new PooledEventFactory(), this.ringBufferSize, new PooledThreadFactory(this.name), ProducerType.SINGLE, new SleepingWaitStrategy());
        disruptor.handleEventsWithWorkerPool(pooledEventHandlerArr);
        disruptor.start();
        this.disruptor = disruptor;
    }

    public void offer(Iterable<E> iterable) {
        Iterator<E> it = iterable.iterator();
        while (it.hasNext()) {
            offer((DisruptorPooledExecutor<E>) it.next());
        }
    }

    public void offer(E... eArr) {
        for (E e : eArr) {
            offer((DisruptorPooledExecutor<E>) e);
        }
    }

    public void offer(E e) {
        if (!this.running) {
            throw new IllegalStateException("状态已停止");
        }
        if (isExceedErrorLimit()) {
            throw new IllegalStateException("处理失败数量超限(" + getErrorLimit() + ")");
        }
        this.disruptor.publishEvent(this::translateTo, e);
    }

    public void shutdown() {
        try {
            this.running = false;
            log.info("shutdown...");
            this.disruptor.shutdown();
            int size = this.consumers.size();
            for (int i = 0; i < size; i++) {
                ResourceableConsumer resourceableConsumer = (Consumer) this.consumers.get(i);
                if (resourceableConsumer instanceof ResourceableConsumer) {
                    resourceableConsumer.close();
                }
            }
            log.info("shutdown!");
        } catch (Exception e) {
            log.error("", e);
        }
    }

    void translateTo(PooledEvent pooledEvent, long j, E e) {
        pooledEvent.setData(e);
    }

    public void setErrorLimit(int i) {
        this.errorLimit = i;
    }

    public int getErrorLimit() {
        return this.errorLimit;
    }

    public void setOpenStatistics(boolean z) {
        this.openStatistics = z;
    }

    public boolean isOpenStatistics() {
        return this.openStatistics;
    }

    public void setName(String str) {
        this.name = str;
    }

    public String getName() {
        return this.name;
    }
}
