/*
 * Decompiled with CFR 0.152.
 */
package com.fastjrun.job;

import com.fastjrun.helper.LocalDateTimeHelper;
import com.fastjrun.job.BaseDataJob;
import com.google.common.collect.Lists;
import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

public class DefaultDataFlushJob<T>
extends BaseDataJob {
    @Override
    public void execute() {
        if (!this.canStart) {
            return;
        }
        while (!this.baseDataExecutor.canStart()) {
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        this.baseDataExecutor.start();
        this.log.debug("{} started", (Object)this.baseDataExecutor.getClass().getSimpleName());
        LocalDateTime start = LocalDateTime.now();
        AtomicInteger resTotal = new AtomicInteger();
        for (int pageIndex = 0; pageIndex <= this.baseDataExecutor.getPageTotal(); ++pageIndex) {
            this.log.debug("semaphore.availablePermits()={}", (Object)this.semaphore.availablePermits());
            try {
                this.semaphore.acquire();
            }
            catch (InterruptedException e) {
                this.log.error(this.semaphore.availablePermits() + "", (Throwable)e);
            }
            int pageFinal = pageIndex;
            Callable<Integer> callable = () -> {
                List items = this.baseDataExecutor.fetchItems(pageFinal);
                CopyOnWriteArrayList resList = Lists.newCopyOnWriteArrayList();
                if (items != null) {
                    items.parallelStream().forEach(item -> resList.add(this.baseDataExecutor.batchProcess(item)));
                }
                int total = resList.parallelStream().mapToInt(var -> {
                    try {
                        return (Integer)var.get();
                    }
                    catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    catch (ExecutionException e) {
                        e.printStackTrace();
                    }
                    return 0;
                }).sum();
                this.semaphore.release();
                return total;
            };
            Future<Integer> future = null;
            while (future == null) {
                try {
                    future = this.executorService.submit(callable);
                }
                catch (RejectedExecutionException e) {
                    this.log.warn(this.semaphore.availablePermits() + "", (Throwable)e);
                }
            }
            try {
                resTotal.addAndGet((Integer)future.get());
                continue;
            }
            catch (InterruptedException e) {
                e.printStackTrace();
                continue;
            }
            catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        LocalDateTime end = LocalDateTime.now();
        String duration = LocalDateTimeHelper.formatDuration((LocalDateTime)start, (LocalDateTime)end);
        this.log.debug("{} ended with data size:{};cost time:{}", new Object[]{this.baseDataExecutor.getClass().getSimpleName(), resTotal.get(), duration});
        this.baseDataExecutor.stop();
    }
}

