package cascading.stats;

import cascading.flow.FlowException;
import cascading.stats.CascadingStats;
import cascading.util.Util;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cascading/stats/CounterCache.class */
public abstract class CounterCache<Config, JobStatus, Counters> {
    public static final String COUNTER_TIMEOUT_PROPERTY = "cascading.counter.timeout.seconds";
    public static final String COUNTER_FETCH_RETRIES_PROPERTY = "cascading.counter.fetch.retries";
    public static final String COUNTER_MAX_AGE_PROPERTY = "cascading.counter.age.max.seconds";
    public static final String NODE_COUNTER_MAX_AGE_PROPERTY = "cascading.node.counter.age.max.seconds";
    public static final int DEFAULT_TIMEOUT_TIMEOUT_SEC = 0;
    public static final int DEFAULT_FETCH_RETRIES = 3;
    public static final int DEFAULT_CACHED_AGE_MAX = 0;
    public static final int DEFAULT_NODE_CACHED_AGE_MAX = 30;
    private static final Logger LOG = LoggerFactory.getLogger(CounterCache.class);
    private static ExecutorService futuresPool = Executors.newSingleThreadExecutor(new ThreadFactory() { // from class: cascading.stats.CounterCache.1
        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable, "stats-counter-future");
            thread.setDaemon(true);
            return thread;
        }
    });
    private CascadingStats stats;
    private boolean hasCapturedFinalCounters;
    protected int fetchAttempts;
    protected int maxAge;
    protected final Config configuration;
    private boolean hasAvailableCounters = true;
    private Counters cachedCounters = null;
    private long lastFetch = -1;
    private boolean warnedStale = false;
    protected int timeout = getIntProperty(COUNTER_TIMEOUT_PROPERTY, 0);
    protected int maxFetchAttempts = getIntProperty(COUNTER_FETCH_RETRIES_PROPERTY, 3);

    protected CounterCache(CascadingStats cascadingStats, Config config) {
        this.stats = cascadingStats;
        this.configuration = config;
        if (cascadingStats.getType() == CascadingStats.Type.NODE) {
            this.maxAge = getIntProperty(NODE_COUNTER_MAX_AGE_PROPERTY, 30);
        } else {
            this.maxAge = getIntProperty(COUNTER_MAX_AGE_PROPERTY, 0);
        }
    }

    protected abstract int getIntProperty(String str, int i);

    public long getLastSuccessfulFetch() {
        return this.lastFetch;
    }

    protected abstract JobStatus getJobStatusClient();

    protected abstract boolean areCountersAvailable(JobStatus jobstatus);

    protected abstract Counters getCounters(JobStatus jobstatus) throws IOException;

    protected abstract Collection<String> getGroupNames(Counters counters);

    protected abstract Set<String> getCountersFor(Counters counters, String str);

    protected abstract long getCounterValue(Counters counters, Enum r2);

    protected abstract long getCounterValue(Counters counters, String str, String str2);

    public Collection<String> getCounterGroups() {
        Counters cachedCounters = cachedCounters();
        return cachedCounters == null ? Collections.emptySet() : Collections.unmodifiableCollection(getGroupNames(cachedCounters));
    }

    public Collection<String> getCounterGroupsMatching(String str) {
        Counters cachedCounters = cachedCounters();
        if (cachedCounters == null) {
            return Collections.emptySet();
        }
        HashSet hashSet = new HashSet();
        for (String str2 : getGroupNames(cachedCounters)) {
            if (str2.matches(str)) {
                hashSet.add(str2);
            }
        }
        return Collections.unmodifiableCollection(hashSet);
    }

    public Collection<String> getCountersFor(String str) {
        Counters cachedCounters = cachedCounters();
        return cachedCounters == null ? Collections.emptySet() : Collections.unmodifiableCollection(getCountersFor(cachedCounters, str));
    }

    public long getCounterValue(Enum r5) {
        Counters cachedCounters = cachedCounters();
        if (cachedCounters == null) {
            return 0L;
        }
        return getCounterValue((CounterCache<Config, JobStatus, Counters>) cachedCounters, r5);
    }

    public long getCounterValue(String str, String str2) {
        Counters cachedCounters = cachedCounters();
        if (cachedCounters == null) {
            return 0L;
        }
        return getCounterValue(cachedCounters, str, str2);
    }

    public Counters cachedCounters() {
        return cachedCounters(false);
    }

    public synchronized Counters cachedCounters(boolean z) {
        JobStatus jobStatusClient;
        if (!this.hasAvailableCounters) {
            return this.cachedCounters;
        }
        if (this.fetchAttempts >= this.maxFetchAttempts) {
            if (!this.hasCapturedFinalCounters && !this.warnedStale) {
                if (this.cachedCounters == null) {
                    LOG.warn("no counters fetched, max num consecutive retries reached: {}, type: {}, status: {}", new Object[]{Integer.valueOf(this.maxFetchAttempts), this.stats.getType(), this.stats.getStatus()});
                } else {
                    LOG.warn("stale counters being returned, max num consecutive retries reached, age: {}, type: {}, status: {}", new Object[]{Util.formatDurationFromMillis(System.currentTimeMillis() - this.lastFetch), this.stats.getType(), this.stats.getStatus()});
                }
                this.warnedStale = true;
            }
            return this.cachedCounters;
        }
        boolean isFinished = this.stats.isFinished();
        if (isFinished && this.hasCapturedFinalCounters) {
            return this.cachedCounters;
        }
        if (!z && isFinished) {
            z = true;
        }
        boolean z2 = ((int) ((this.lastFetch - System.currentTimeMillis()) / 1000)) >= this.maxAge;
        if ((this.cachedCounters == null || z || z2) && (jobStatusClient = getJobStatusClient()) != null) {
            if (!areCountersAvailable(jobStatusClient)) {
                this.hasAvailableCounters = false;
                return this.cachedCounters;
            }
            boolean z3 = false;
            try {
                Counters fetchCounters = fetchCounters(jobStatusClient);
                z3 = fetchCounters != null;
                if (z3) {
                    this.cachedCounters = fetchCounters;
                    this.lastFetch = System.currentTimeMillis();
                    this.fetchAttempts = 0;
                }
            } catch (InterruptedException e) {
                LOG.warn("fetching counters was interrupted");
            } catch (ExecutionException e2) {
                this.fetchAttempts++;
                if (this.fetchAttempts >= this.maxFetchAttempts) {
                    LOG.error("fetching counters failed, was final consecutive attempt: {}, type: {}, status: {}", new Object[]{Integer.valueOf(this.fetchAttempts), this.stats.getType(), this.stats.getStatus(), e2.getCause()});
                } else {
                    LOG.warn("fetching counters failed, consecutive attempts: {}, type: {}, status: {}, message: {}", new Object[]{Integer.valueOf(this.fetchAttempts), this.stats.getType(), this.stats.getStatus(), e2.getCause().getMessage()});
                }
                if (this.cachedCounters != null) {
                    LOG.error("returning cached values");
                    return this.cachedCounters;
                }
                LOG.error("unable to get remote counters, no cached values, rethrowing exception", e2.getCause());
                if (e2.getCause() instanceof FlowException) {
                    throw ((FlowException) e2.getCause());
                }
                throw new FlowException(e2.getCause());
            } catch (TimeoutException e3) {
                this.fetchAttempts++;
                if (this.fetchAttempts >= this.maxFetchAttempts) {
                    LOG.warn("fetching counters timed out after: {} seconds, was final consecutive attempt: {}, type: {}, status: {}", new Object[]{Integer.valueOf(this.timeout), Integer.valueOf(this.fetchAttempts), this.stats.getType(), this.stats.getStatus()});
                } else {
                    LOG.warn("fetching counters timed out after: {} seconds, consecutive attempts: {}, type: {}, status: {}", new Object[]{Integer.valueOf(this.timeout), Integer.valueOf(this.fetchAttempts), this.stats.getType(), this.stats.getStatus()});
                }
            }
            this.hasCapturedFinalCounters = isFinished && z3;
            return this.cachedCounters;
        }
        return this.cachedCounters;
    }

    private Counters fetchCounters(JobStatus jobstatus) throws InterruptedException, ExecutionException, TimeoutException {
        if (this.timeout > 0) {
            return runFuture(jobstatus).get(this.timeout, TimeUnit.SECONDS);
        }
        try {
            return getCounters(jobstatus);
        } catch (IOException e) {
            throw new FlowException("unable to get remote counter values", e);
        }
    }

    private Future<Counters> runFuture(final JobStatus jobstatus) {
        return futuresPool.submit(new Callable<Counters>() { // from class: cascading.stats.CounterCache.2
            /* JADX WARN: Multi-variable type inference failed */
            @Override // java.util.concurrent.Callable
            public Counters call() throws Exception {
                try {
                    return (Counters) CounterCache.this.getCounters(jobstatus);
                } catch (IOException e) {
                    throw new FlowException("unable to get remote counter values", e);
                }
            }
        });
    }
}
