package io.polaris.core.concurrent.pool;

import io.polaris.core.collection.Iterables;
import io.polaris.core.log.ILogger;
import io.polaris.core.log.ILoggers;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

/* loaded from: input_file:io/polaris/core/concurrent/pool/RunnableDelegates.class */
public class RunnableDelegates {
    private static final ILogger log = ILoggers.of((Class<?>) RunnableDelegates.class);

    public static <E> Runnable createDelegate(RunnableState<E> runnableState, Consumer<E> consumer, AtomicReference<Consumer<ErrorRecords<E>>> atomicReference) {
        return () -> {
            RunnableStatistics runnableStatistics = runnableState.runnableStatistics();
            runnableState.incrementActiveCount();
            try {
                if (runnableStatistics != null) {
                    while (runnableState.hasNext()) {
                        Object next = runnableState.next();
                        if (next != null) {
                            runnableStatistics.getTotal().incrementAndGet();
                            try {
                                consumer.accept(next);
                                runnableStatistics.getSuccess().incrementAndGet();
                            } catch (Throwable th) {
                                runnableStatistics.getError().incrementAndGet();
                                try {
                                    Consumer consumer2 = (Consumer) atomicReference.get();
                                    if (consumer2 != null) {
                                        consumer2.accept(new ErrorRecords(Iterables.asList(new ErrorRecord(next, th)), th));
                                    }
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                                log.error("处理失败", th);
                            }
                            runnableState.notifyFinished();
                        }
                    }
                } else {
                    while (runnableState.hasNext()) {
                        Object next2 = runnableState.next();
                        if (next2 != null) {
                            try {
                                consumer.accept(next2);
                                runnableState.notifyFinished();
                            } catch (Throwable th3) {
                                try {
                                    Consumer consumer3 = (Consumer) atomicReference.get();
                                    if (consumer3 != null) {
                                        consumer3.accept(new ErrorRecords(Iterables.asList(new ErrorRecord(next2, th3)), th3));
                                    }
                                } catch (Throwable th4) {
                                    th3.addSuppressed(th4);
                                }
                                log.error("处理失败", th3);
                            }
                        }
                    }
                }
            } finally {
                runnableState.decrementActiveCount();
                runnableState.notifyFinished();
                log.trace("处理完成");
            }
        };
    }

    public static <E, Resource> Runnable createDelegate(RunnableState<E> runnableState, TransactionConsumer<E, Resource> transactionConsumer, AtomicReference<Consumer<ErrorRecords<E>>> atomicReference) {
        return () -> {
            RunnableStatistics runnableStatistics = runnableState.runnableStatistics();
            runnableState.incrementActiveCount();
            try {
                try {
                    int commitCount = transactionConsumer.commitCount();
                    if (commitCount <= 0) {
                        commitCount = 1000;
                    }
                    int i = 0;
                    int i2 = 0;
                    int i3 = 0;
                    int i4 = 0;
                    ArrayList<ErrorRecord> arrayList = new ArrayList(commitCount);
                    Object openResource = transactionConsumer.openResource();
                    while (runnableState.hasNext()) {
                        Object next = runnableState.next();
                        if (next != null) {
                            try {
                                i++;
                                if (runnableStatistics != null) {
                                    runnableStatistics.getTotal().incrementAndGet();
                                }
                                i4++;
                                arrayList.add(new ErrorRecord(next, null));
                                if (i4 >= commitCount) {
                                    Iterator it = arrayList.iterator();
                                    while (it.hasNext()) {
                                        try {
                                            transactionConsumer.processData(openResource, ((ErrorRecord) it.next()).getData());
                                        } catch (Throwable th) {
                                            throw th;
                                            break;
                                        }
                                    }
                                    transactionConsumer.commitResource(openResource);
                                    i2 += i4;
                                    if (runnableStatistics != null) {
                                        runnableStatistics.getSuccess().addAndGet(i4);
                                    }
                                    log.trace("处理/提交成功. total: {}, success: {}, error: {}", Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(i3));
                                    i4 = 0;
                                    arrayList = new ArrayList(commitCount);
                                }
                            } catch (Throwable th2) {
                                i3 += i4;
                                if (runnableStatistics != null) {
                                    runnableStatistics.getError().addAndGet(i4);
                                }
                                i4 = 0;
                                log.trace("处理/提交失败. total: {}, success: {}, error: {}", Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(i3));
                                try {
                                    transactionConsumer.rollbackResource(openResource);
                                } catch (Throwable th3) {
                                    log.error("回滚失败", th3);
                                    th2.addSuppressed(th3);
                                }
                                try {
                                    Consumer consumer = (Consumer) atomicReference.get();
                                    if (consumer != null) {
                                        ErrorRecords errorRecords = new ErrorRecords(arrayList);
                                        errorRecords.setError(th2);
                                        consumer.accept(errorRecords);
                                    }
                                } catch (Throwable th4) {
                                    th2.addSuppressed(th4);
                                }
                                arrayList = new ArrayList(commitCount);
                                log.error("处理/提交失败", th2);
                            }
                            runnableState.notifyFinished();
                        }
                    }
                    if (i4 > 0) {
                        try {
                            for (ErrorRecord errorRecord : arrayList) {
                                try {
                                    transactionConsumer.processData(openResource, errorRecord.getData());
                                } finally {
                                    errorRecord.setError(th);
                                }
                            }
                            transactionConsumer.commitResource(openResource);
                            int i5 = i2 + i4;
                            if (runnableStatistics != null) {
                                runnableStatistics.getSuccess().addAndGet(i4);
                            }
                            log.trace("处理/提交成功. total: {}, success: {}, error: {}", Integer.valueOf(i), Integer.valueOf(i5), Integer.valueOf(i3));
                        } catch (Throwable th5) {
                            int i6 = i3 + i4;
                            if (runnableStatistics != null) {
                                runnableStatistics.getError().addAndGet(i4);
                            }
                            log.trace("处理/提交失败. total: {}, success: {}, error: {}", Integer.valueOf(i), Integer.valueOf(i2), Integer.valueOf(i6));
                            try {
                                transactionConsumer.rollbackResource(openResource);
                            } catch (Throwable th6) {
                                log.error("回滚失败", th6);
                                th5.addSuppressed(th6);
                            }
                            try {
                                Consumer consumer2 = (Consumer) atomicReference.get();
                                if (consumer2 != null) {
                                    ErrorRecords errorRecords2 = new ErrorRecords(arrayList);
                                    errorRecords2.setError(th5);
                                    consumer2.accept(errorRecords2);
                                }
                            } catch (Throwable th7) {
                                th5.addSuppressed(th7);
                            }
                            log.error("处理/提交失败", th5);
                        }
                    }
                    log.trace("处理完成");
                    runnableState.decrementActiveCount();
                    runnableState.notifyFinished();
                    transactionConsumer.closeResource(openResource);
                } catch (Throwable th8) {
                    log.error("", th8);
                    log.trace("处理完成");
                    runnableState.decrementActiveCount();
                    runnableState.notifyFinished();
                    transactionConsumer.closeResource(null);
                }
            } catch (Throwable th9) {
                log.trace("处理完成");
                runnableState.decrementActiveCount();
                runnableState.notifyFinished();
                transactionConsumer.closeResource(null);
                throw th9;
            }
        };
    }
}
