package com.fastjrun.job;

import com.fastjrun.helper.LocalDateTimeHelper;
import com.google.common.collect.Lists;
import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

/* loaded from: input_file:com/fastjrun/job/DefaultDataFlushJob.class */
public class DefaultDataFlushJob<T> extends BaseDataJob {
    @Override // com.fastjrun.job.BaseJob
    public void execute() {
        if (this.canStart) {
            while (!this.baseDataExecutor.canStart()) {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            this.baseDataExecutor.start();
            this.log.debug("{} started", this.baseDataExecutor.getClass().getSimpleName());
            LocalDateTime now = LocalDateTime.now();
            AtomicInteger atomicInteger = new AtomicInteger();
            for (int i = 0; i <= this.baseDataExecutor.getPageTotal(); i++) {
                this.log.debug("semaphore.availablePermits()={}", Integer.valueOf(this.semaphore.availablePermits()));
                try {
                    this.semaphore.acquire();
                } catch (InterruptedException e2) {
                    this.log.error(this.semaphore.availablePermits() + "", e2);
                }
                int i2 = i;
                Callable<T> callable = () -> {
                    List<T> fetchItems = this.baseDataExecutor.fetchItems(i2);
                    CopyOnWriteArrayList newCopyOnWriteArrayList = Lists.newCopyOnWriteArrayList();
                    if (fetchItems != null) {
                        fetchItems.parallelStream().forEach(obj -> {
                            newCopyOnWriteArrayList.add(this.baseDataExecutor.batchProcess(obj));
                        });
                    }
                    int sum = newCopyOnWriteArrayList.parallelStream().mapToInt(future -> {
                        try {
                            return ((Integer) future.get()).intValue();
                        } catch (InterruptedException e3) {
                            e3.printStackTrace();
                            return 0;
                        } catch (ExecutionException e4) {
                            e4.printStackTrace();
                            return 0;
                        }
                    }).sum();
                    this.semaphore.release();
                    return Integer.valueOf(sum);
                };
                Future<T> future = null;
                while (future == null) {
                    try {
                        future = this.executorService.submit(callable);
                    } catch (RejectedExecutionException e3) {
                        this.log.warn(this.semaphore.availablePermits() + "", e3);
                    }
                }
                try {
                    atomicInteger.addAndGet(((Integer) future.get()).intValue());
                } catch (InterruptedException e4) {
                    e4.printStackTrace();
                } catch (ExecutionException e5) {
                    e5.printStackTrace();
                }
            }
            this.log.debug("{} ended with data size:{};cost time:{}", new Object[]{this.baseDataExecutor.getClass().getSimpleName(), Integer.valueOf(atomicInteger.get()), LocalDateTimeHelper.formatDuration(now, LocalDateTime.now())});
            this.baseDataExecutor.stop();
        }
    }
}
