package io.polaris.core.concurrent.pool;

import io.polaris.core.log.ILogger;
import io.polaris.core.log.ILoggers;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

/* loaded from: input_file:io/polaris/core/concurrent/pool/ConsumerDelegates.class */
public class ConsumerDelegates {
    private static final ILogger log = ILoggers.of((Class<?>) ConsumerDelegates.class);

    public static <E> Consumer<E> createDelegate(RunnableStatisticsHolder runnableStatisticsHolder, Consumer<E> consumer) {
        return obj -> {
            RunnableStatistics runnableStatistics = runnableStatisticsHolder.runnableStatistics();
            if (runnableStatistics == null) {
                consumer.accept(obj);
                return;
            }
            runnableStatistics.getTotal().incrementAndGet();
            try {
                consumer.accept(obj);
                runnableStatistics.getSuccess().incrementAndGet();
            } catch (Exception e) {
                runnableStatistics.getError().incrementAndGet();
            }
        };
    }

    public static <E, Resource> ResourceableConsumer<E> createDelegate(final RunnableStatisticsHolder runnableStatisticsHolder, final TransactionConsumer<E, Resource> transactionConsumer) {
        return new ResourceableConsumer<E>() { // from class: io.polaris.core.concurrent.pool.ConsumerDelegates.1
            private TransactionConsumer<E, Resource> consumer;
            private RunnableStatistics statistics;
            private int commitCount;
            private volatile Resource resource;
            private AtomicInteger total = new AtomicInteger(0);
            private AtomicInteger success = new AtomicInteger(0);
            private AtomicInteger error = new AtomicInteger(0);
            private AtomicInteger commit = new AtomicInteger(0);
            private AtomicReference<Throwable> processError = new AtomicReference<>();

            {
                this.consumer = TransactionConsumer.this;
            }

            /* JADX WARN: Type inference failed for: r1v7, types: [Resource, java.lang.Object] */
            @Override // io.polaris.core.concurrent.pool.ResourceableConsumer
            public void open() {
                try {
                    int commitCount = this.consumer.commitCount();
                    if (commitCount <= 0) {
                        commitCount = 1000;
                    }
                    this.commitCount = commitCount;
                    this.statistics = runnableStatisticsHolder.runnableStatistics();
                    this.resource = this.consumer.openResource();
                } catch (Throwable th) {
                    throw new RuntimeException(th);
                }
            }

            @Override // io.polaris.core.concurrent.pool.ResourceableConsumer
            public void close() {
                if (this.resource != 0) {
                    if (this.commit.get() > 0) {
                        try {
                            if (this.processError.get() != null) {
                                throw this.processError.get();
                            }
                            this.consumer.commitResource(this.resource);
                            this.success.addAndGet(this.commit.get());
                            if (this.statistics != null) {
                                this.statistics.getSuccess().addAndGet(this.commit.get());
                            }
                            ConsumerDelegates.log.trace("处理/提交成功. total: {}, success: {}, error: {}", Integer.valueOf(this.total.get()), Integer.valueOf(this.success.get()), Integer.valueOf(this.error.get()));
                        } catch (Throwable th) {
                            this.error.addAndGet(this.commit.get());
                            if (this.statistics != null) {
                                this.statistics.getError().addAndGet(this.commit.get());
                            }
                            try {
                                ConsumerDelegates.log.trace("处理/提交失败. total: {}, success: {}, error: {}", Integer.valueOf(this.total.get()), Integer.valueOf(this.success.get()), Integer.valueOf(this.error.get()));
                                ConsumerDelegates.log.error("处理/提交失败, 准备回滚.", th);
                                this.consumer.rollbackResource(this.resource);
                            } catch (Throwable th2) {
                                ConsumerDelegates.log.error("回滚失败", th2);
                            }
                        }
                    }
                    ConsumerDelegates.log.trace("处理完成");
                    this.consumer.closeResource(this.resource);
                }
            }

            @Override // java.util.function.Consumer
            public void accept(E e) {
                try {
                    this.total.incrementAndGet();
                    if (this.statistics != null) {
                        this.statistics.getTotal().incrementAndGet();
                    }
                    this.commit.incrementAndGet();
                    try {
                        this.consumer.processData(this.resource, e);
                    } catch (Throwable th) {
                        if (this.processError.get() != null) {
                            this.processError.get().addSuppressed(th);
                        } else {
                            this.processError.set(th);
                        }
                    }
                    if (this.commit.get() >= this.commitCount) {
                        if (this.processError.get() != null) {
                            throw this.processError.get();
                        }
                        this.consumer.commitResource(this.resource);
                        this.success.addAndGet(this.commit.get());
                        if (this.statistics != null) {
                            this.statistics.getSuccess().addAndGet(this.commit.get());
                        }
                        ConsumerDelegates.log.trace("处理/提交成功. total: {}, success: {}, error: {}", Integer.valueOf(this.total.get()), Integer.valueOf(this.success.get()), Integer.valueOf(this.error.get()));
                        this.commit.set(0);
                    }
                } catch (Throwable th2) {
                    this.error.addAndGet(this.commit.get());
                    if (this.statistics != null) {
                        this.statistics.getError().addAndGet(this.commit.get());
                    }
                    this.commit.set(0);
                    try {
                        ConsumerDelegates.log.trace("处理/提交失败. total: {}, success: {}, error: {}", Integer.valueOf(this.total.get()), Integer.valueOf(this.success.get()), Integer.valueOf(this.error.get()));
                        ConsumerDelegates.log.error("处理/提交失败, 准备回滚.", th2);
                        this.consumer.rollbackResource(this.resource);
                    } catch (Throwable th3) {
                        ConsumerDelegates.log.error("回滚失败", th3);
                    }
                }
            }
        };
    }
}
