/*
 * Decompiled with CFR 0.152.
 */
package com.spotify.helios.agent;

import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.MoreExecutors;
import com.spotify.helios.common.descriptors.JobId;
import com.spotify.helios.common.descriptors.TaskStatus;
import com.spotify.helios.common.descriptors.TaskStatusEvent;
import com.spotify.helios.servicescommon.PersistentAtomicReference;
import com.spotify.helios.servicescommon.coordination.Paths;
import com.spotify.helios.servicescommon.coordination.ZooKeeperClient;
import java.io.IOException;
import java.nio.channels.ClosedByInterruptException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueueingHistoryWriter
extends AbstractIdleService
implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(QueueingHistoryWriter.class);
    public static final int MAX_NUMBER_STATUS_EVENTS_TO_RETAIN = 30;
    private static final int MAX_QUEUE_SIZE = 30;
    private static final int MAX_TOTAL_SIZE = 600;
    private final ConcurrentMap<JobId, Deque<TaskStatusEvent>> items;
    private final ScheduledExecutorService zkWriterExecutor = MoreExecutors.getExitingScheduledExecutorService((ScheduledThreadPoolExecutor)((ScheduledThreadPoolExecutor)Executors.newScheduledThreadPool(1)), (long)0L, (TimeUnit)TimeUnit.SECONDS);
    private final String hostname;
    private final AtomicInteger count;
    private final ZooKeeperClient client;
    private final PersistentAtomicReference<ConcurrentMap<JobId, Deque<TaskStatusEvent>>> backingStore;

    public QueueingHistoryWriter(String hostname, ZooKeeperClient client, Path backingFile) throws IOException, InterruptedException {
        this.hostname = hostname;
        this.client = client;
        this.backingStore = PersistentAtomicReference.create(backingFile, new TypeReference<ConcurrentMap<JobId, Deque<TaskStatusEvent>>>(){}, new Supplier<ConcurrentMap<JobId, Deque<TaskStatusEvent>>>(){

            public ConcurrentMap<JobId, Deque<TaskStatusEvent>> get() {
                return Maps.newConcurrentMap();
            }
        });
        this.items = this.backingStore.get();
        ImmutableSet curKeys = ImmutableSet.copyOf(this.items.keySet());
        for (JobId key : curKeys) {
            if (this.items.get(key) != null) continue;
            this.items.remove(key);
        }
        int itemCount = 0;
        for (Deque deque : this.items.values()) {
            itemCount += deque.size();
        }
        this.count = new AtomicInteger(itemCount);
    }

    protected void startUp() throws Exception {
        this.zkWriterExecutor.scheduleAtFixedRate(this, 1L, 1L, TimeUnit.SECONDS);
    }

    protected void shutDown() throws Exception {
        this.zkWriterExecutor.shutdownNow();
        this.zkWriterExecutor.awaitTermination(1L, TimeUnit.MINUTES);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void add(TaskStatusEvent item) throws InterruptedException {
        Deque<TaskStatusEvent> deque;
        while (this.count.get() >= 600) {
            this.getNext();
        }
        JobId key = item.getStatus().getJob().getId();
        Deque<TaskStatusEvent> deque2 = deque = this.getDeque(key);
        synchronized (deque2) {
            while (deque.size() >= 30) {
                deque.remove();
                this.count.decrementAndGet();
            }
            deque.add(item);
            this.count.incrementAndGet();
        }
        try {
            this.backingStore.set(this.items);
        }
        catch (ClosedByInterruptException e) {
            log.debug("Writing task status event to backing store was interrupted");
        }
        catch (IOException e) {
            log.warn("Failed to write task status event to backing store", (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Deque<TaskStatusEvent> getDeque(JobId key) {
        ConcurrentMap<JobId, Deque<TaskStatusEvent>> concurrentMap = this.items;
        synchronized (concurrentMap) {
            Deque deque = (Deque)this.items.get(key);
            if (deque == null) {
                ConcurrentLinkedDeque<TaskStatusEvent> newDeque = new ConcurrentLinkedDeque<TaskStatusEvent>();
                this.items.put(key, newDeque);
                return newDeque;
            }
            return deque;
        }
    }

    public void saveHistoryItem(JobId jobId, TaskStatus status) throws InterruptedException {
        this.saveHistoryItem(jobId, status, System.currentTimeMillis());
    }

    public void saveHistoryItem(JobId jobId, TaskStatus status, long timestamp) throws InterruptedException {
        this.add(new TaskStatusEvent(status, timestamp, this.hostname));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private TaskStatusEvent getNext() {
        Deque deque;
        JobId id;
        TaskStatusEvent current;
        while (true) {
            if ((current = this.findEldestEvent()) == null) {
                return null;
            }
            id = current.getStatus().getJob().getId();
            deque = (Deque)this.items.get(id);
            if (deque == null) continue;
            Deque deque2 = deque;
            synchronized (deque2) {
                if (((TaskStatusEvent)deque.peek()).equals(current)) break;
            }
        }
        {
            TaskStatusEvent newCurrent = (TaskStatusEvent)deque.poll();
            this.count.decrementAndGet();
            Preconditions.checkState((boolean)current.equals(newCurrent), (Object)"current should equal newCurrent");
            ConcurrentMap<JobId, Deque<TaskStatusEvent>> concurrentMap = this.items;
            synchronized (concurrentMap) {
                Deque curDeque = (Deque)this.items.get(id);
                if (curDeque != null && curDeque.isEmpty()) {
                    this.items.remove(id);
                }
                return current;
            }
        }
    }

    public boolean isEmpty() {
        return this.count.get() == 0;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void putBack(TaskStatusEvent event) {
        Deque<TaskStatusEvent> queue;
        JobId key = event.getStatus().getJob().getId();
        Deque<TaskStatusEvent> deque = queue = this.getDeque(key);
        synchronized (deque) {
            if (queue.size() >= 30) {
                return;
            }
            queue.push(event);
            this.count.incrementAndGet();
        }
    }

    private TaskStatusEvent findEldestEvent() {
        TaskStatusEvent current = null;
        for (Deque queue : this.items.values()) {
            if (queue == null) continue;
            TaskStatusEvent item = (TaskStatusEvent)queue.peek();
            if (current != null && item.getTimestamp() >= current.getTimestamp()) continue;
            current = item;
        }
        return current;
    }

    @Override
    public void run() {
        while (true) {
            TaskStatusEvent item;
            if ((item = this.getNext()) == null) {
                return;
            }
            try {
                JobId jobId = item.getStatus().getJob().getId();
                String historyPath = Paths.historyJobHostEventsTimestamp(jobId, this.hostname, item.getTimestamp());
                log.debug("writing queued item to zookeeper {} {}", (Object)item.getStatus().getJob().getId(), (Object)item.getTimestamp());
                this.client.ensurePath(historyPath, true);
                this.client.createAndSetData(historyPath, item.getStatus().toJsonBytes());
                List<String> events = this.client.getChildren(Paths.historyJobHostEvents(jobId, this.hostname));
                if (events.size() <= 30) continue;
                this.trimStatusEvents(events, jobId);
                continue;
            }
            catch (KeeperException.NodeExistsException e) {
                log.debug("item we wanted in is already there");
                continue;
            }
            catch (KeeperException.ConnectionLossException e) {
                log.warn("Connection lost while putting item into zookeeper, will retry");
                this.putBack(item);
            }
            catch (KeeperException e) {
                log.error("Error putting item into zookeeper, will retry", (Throwable)e);
                this.putBack(item);
            }
            break;
        }
    }

    private void trimStatusEvents(List<String> events, JobId jobId) {
        ArrayList eventsAsLongs = Lists.newArrayList((Iterable)Iterables.transform(events, (Function)new Function<String, Long>(){

            public Long apply(String name) {
                return Long.valueOf(name);
            }
        }));
        Collections.sort(eventsAsLongs);
        for (int i = 0; i < eventsAsLongs.size() - 30; ++i) {
            try {
                this.client.delete(Paths.historyJobHostEventsTimestamp(jobId, this.hostname, (Long)eventsAsLongs.get(i)));
                continue;
            }
            catch (KeeperException e) {
                log.warn("failure deleting overflow of status items - we're hoping a later execution will fix", (Throwable)e);
            }
        }
    }
}

