package com.yahoo.sql4d.indexeragent.actors;

import akka.actor.UntypedActor;
import com.yahoo.sql4d.indexeragent.Agent;
import com.yahoo.sql4d.indexeragent.DruidMeta;
import com.yahoo.sql4d.indexeragent.meta.JobFreq;
import com.yahoo.sql4d.indexeragent.meta.JobStatus;
import com.yahoo.sql4d.indexeragent.meta.Utils;
import com.yahoo.sql4d.indexeragent.meta.beans.DataSource;
import com.yahoo.sql4d.indexeragent.meta.beans.StatusTrail;
import com.yahoo.sql4d.indexeragent.util.UniqueOnlyQueue;
import com.yahoo.sql4d.sql4ddriver.DDataSource;
import com.yahoo.sql4d.sql4ddriver.Joiner4All;
import com.yahoo.sql4d.sql4ddriver.Mapper4All;
import java.util.ArrayList;
import java.util.List;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.ISODateTimeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.util.Either;

/* loaded from: input_file:com/yahoo/sql4d/indexeragent/actors/WorkerActor.class */
public class WorkerActor extends UntypedActor {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) WorkerActor.class);
    private static final DateTimeFormatter isoFormat = ISODateTimeFormat.dateTime().withZoneUTC();
    private static final UniqueOnlyQueue<StatusTrail> newWorkQueue = new UniqueOnlyQueue<>();
    private static final UniqueOnlyQueue<StatusTrail> inProgressWorkQueue = new UniqueOnlyQueue<>();
    private final DDataSource druidDriver = new DDataSource(DruidMeta.getBrokerHost(), DruidMeta.getBrokerPort(), DruidMeta.getCoordinatorHost(), DruidMeta.getCoordinatorPort(), DruidMeta.getOverlordHost(), DruidMeta.getOverlordPort());

    @Override // akka.actor.UntypedActor
    public void onReceive(Object obj) throws Exception {
        if (!(obj instanceof MessageTypes)) {
            unhandled(obj);
            return;
        }
        switch ((MessageTypes) obj) {
            case GENERATE_WORK:
                generateWork();
                return;
            case EXECUTE_WORK:
                executeWork();
                return;
            case TRACK_WORK:
                checkInProgressWork();
                return;
            default:
                throw new UnsupportedOperationException("Worker received unknown message ." + obj);
        }
    }

    private void generateWork() {
        List<DataSource> allDataSources = Agent.db().getAllDataSources();
        long currentTimeMillis = System.currentTimeMillis();
        ArrayList arrayList = new ArrayList();
        for (DataSource dataSource : allDataSources) {
            if (dataSource.getStatus() == null) {
                log.error("DataSource {}'s status is null", dataSource);
            } else if (JobStatus.valueOf(dataSource.getStatus()) != JobStatus.done) {
                long spinFromTime = dataSource.getSpinFromTime();
                while (currentTimeMillis > spinFromTime + Agent.getTaskAttemptDelayInMillis() && spinFromTime < dataSource.getEndTime()) {
                    StatusTrail status = new StatusTrail().setNominalTime(spinFromTime).setAttemptsDone(0).setDataSourceId(dataSource.getId()).setGivenUp(0).setStatus(JobStatus.not_done);
                    Agent.db().addStatusTrail(status);
                    spinFromTime += JobFreq.valueOf(dataSource.getFrequency()).inMillis();
                    log.info("Generated work : {}", status);
                }
                if (spinFromTime > dataSource.getEndTime()) {
                    dataSource.setStatus(JobStatus.done);
                }
                arrayList.add(Agent.db().getIncompleteTasks(dataSource));
                Agent.db().updateDataSource(dataSource.setSpinFromTime(spinFromTime));
            }
        }
        newWorkQueue.clearAndMergeKLists(arrayList);
    }

    private void executeWork() {
        StatusTrail removeFirst = newWorkQueue.removeFirst();
        if (removeFirst != null) {
            log.info("New task {}", removeFirst);
            String materializeTemplate = materializeTemplate(Agent.db().getDataSource(removeFirst.getDataSourceId()).getTemplateSql(), removeFirst.getNominalTime(), removeFirst.getNominalTime(), removeFirst.getNominalTime() + JobFreq.valueOf(r0.getFrequency()).inMillis());
            log.info("Sql is {}", materializeTemplate);
            Either<String, Either<Joiner4All, Mapper4All>> query = this.druidDriver.query(materializeTemplate, null, null, false, "sql", true);
            if (!query.isLeft()) {
                log.error("Got weird result (expected to run insert) {}", query.right().get());
                return;
            }
            String str = query.left().get();
            log.info("Submitted task {} to overlord ", str);
            removeFirst.setTaskId(str);
            removeFirst.setStatus(JobStatus.in_progress);
            Agent.db().updateStatusTrail(removeFirst);
            if (newWorkQueue.contains(removeFirst)) {
                newWorkQueue.remove(removeFirst);
                log.warn("newWorkQueue had one more entry for st {} which was just set to in_progress", removeFirst);
            }
        }
    }

    private void checkInProgressWork() {
        inProgressWorkQueue.addAll(Agent.db().getAllInprogressTasks());
        while (true) {
            StatusTrail removeFirst = inProgressWorkQueue.removeFirst();
            if (removeFirst == null) {
                return;
            }
            if (removeFirst.getTaskId() != null) {
                switch (this.druidDriver.pollIndexerTaskStatus(removeFirst.getTaskId())) {
                    case SUCCESS:
                        newWorkQueue.remove(removeFirst);
                        removeFirst.setStatus(JobStatus.done);
                        removeFirst.setAttemptsDone(removeFirst.getAttemptsDone() + 1);
                        Agent.db().updateStatusTrail(removeFirst);
                        log.info("Task {} succeeded.", removeFirst);
                        break;
                    case FAILED:
                        newWorkQueue.remove(removeFirst);
                        removeFirst.setStatus(JobStatus.not_done);
                        removeFirst.setAttemptsDone(removeFirst.getAttemptsDone() + 1);
                        Agent.db().updateStatusTrail(removeFirst);
                        log.info("Task {} failed.", removeFirst);
                        break;
                    case RUNNING:
                        log.info("Task {} in progress.", removeFirst);
                        break;
                    case UNKNOWN:
                        log.info("Task {} status unknown.", removeFirst);
                        break;
                }
            } else {
                log.info("Found task with null taskId {}.", removeFirst);
                removeFirst.setStatus(JobStatus.not_done).setAttemptsDone(0).setGivenUp(0);
                Agent.db().updateStatusTrail(removeFirst);
                log.info("Marked the status as not done to ensure it is resubmitted {}", removeFirst);
            }
        }
    }

    private String materializeTemplate(String str, long j, long j2, long j3) {
        DateTime dateTime = new DateTime(j, DateTimeZone.UTC);
        return str.replaceAll("\\{YEAR\\}", Utils.year(dateTime)).replaceAll("\\{YEAR\\}", Utils.year(dateTime)).replaceAll("\\{MONTH\\}", Utils.month(dateTime)).replaceAll("\\{DAY\\}", Utils.day(dateTime)).replaceAll("\\{HOUR\\}", Utils.hour(dateTime)).replaceAll("\\{MIN\\}", Utils.mins(dateTime)).replaceAll(":startTime", isoFormat.print(j2)).replaceAll(":endTime", isoFormat.print(j3));
    }
}
