/*
 * Decompiled with CFR 0.152.
 */
package io.datarouter.storage.config.schema;

import io.datarouter.instrumentation.changelog.ChangelogRecorder;
import io.datarouter.storage.client.ClientId;
import io.datarouter.storage.config.DatarouterAdministratorEmailService;
import io.datarouter.storage.config.DatarouterProperties;
import io.datarouter.storage.config.executor.DatarouterStorageExecutors;
import io.datarouter.storage.config.schema.SchemaUpdateResult;
import io.datarouter.storage.config.storage.clusterschemaupdatelock.ClusterSchemaUpdateLock;
import io.datarouter.storage.config.storage.clusterschemaupdatelock.ClusterSchemaUpdateLockKey;
import io.datarouter.storage.config.storage.clusterschemaupdatelock.DatarouterClusterSchemaUpdateLockDao;
import io.datarouter.storage.node.type.physical.PhysicalNode;
import io.datarouter.util.mutable.MutableString;
import io.datarouter.util.singletonsupplier.SingletonSupplier;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.inject.Provider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseSchemaUpdateService {
    private static final Logger logger = LoggerFactory.getLogger(BaseSchemaUpdateService.class);
    private static final long THROTTLING_DELAY_SECONDS = 10L;
    private final DatarouterProperties datarouterProperties;
    private final DatarouterAdministratorEmailService adminEmailService;
    private final DatarouterStorageExecutors.DatarouterSchemaUpdateScheduler executor;
    private final Provider<DatarouterClusterSchemaUpdateLockDao> schemaUpdateLockDao;
    private final Provider<ChangelogRecorder> changelogRecorder;
    private final String buildId;
    private final Map<ClientId, Supplier<List<String>>> existingTableNamesByClient;
    private final List<Future<Optional<SchemaUpdateResult>>> futures;

    public BaseSchemaUpdateService(DatarouterProperties datarouterProperties, DatarouterAdministratorEmailService adminEmailService, DatarouterStorageExecutors.DatarouterSchemaUpdateScheduler executor, Provider<DatarouterClusterSchemaUpdateLockDao> schemaUpdateLockDao, Provider<ChangelogRecorder> changelogRecorder, String buildId) {
        this.datarouterProperties = datarouterProperties;
        this.adminEmailService = adminEmailService;
        this.executor = executor;
        this.schemaUpdateLockDao = schemaUpdateLockDao;
        this.changelogRecorder = changelogRecorder;
        this.buildId = buildId;
        this.futures = Collections.synchronizedList(new ArrayList());
        this.existingTableNamesByClient = new ConcurrentHashMap<ClientId, Supplier<List<String>>>();
        executor.scheduleWithFixedDelay(this::gatherSchemaUpdates, 0L, 10L, TimeUnit.SECONDS);
    }

    public Future<Optional<SchemaUpdateResult>> queueNodeForSchemaUpdate(ClientId clientId, PhysicalNode<?, ?, ?> node) {
        Supplier existingTableNames = this.existingTableNamesByClient.computeIfAbsent(clientId, this::lazyFetchExistingTables);
        Future<Optional<SchemaUpdateResult>> future = this.executor.submit(this.makeSchemaUpdateCallable(clientId, existingTableNames, node));
        this.futures.add(future);
        return future;
    }

    protected abstract Callable<Optional<SchemaUpdateResult>> makeSchemaUpdateCallable(ClientId var1, Supplier<List<String>> var2, PhysicalNode<?, ?, ?> var3);

    private void gatherSchemaUpdates() {
        this.gatherSchemaUpdates(false);
    }

    public synchronized void gatherSchemaUpdates(boolean wait) {
        boolean shouldNotify = true;
        HashMap<ClientId, List<String>> printedSchemaUpdates = new HashMap<ClientId, List<String>>();
        Iterator<Future<Optional<SchemaUpdateResult>>> futureIterator = this.futures.iterator();
        MutableString oneStartupBlockReason = new MutableString("");
        while (futureIterator.hasNext()) {
            Future<Optional<SchemaUpdateResult>> future = futureIterator.next();
            if (wait || future.isDone()) {
                try {
                    Optional<SchemaUpdateResult> optional = future.get();
                    if (optional.isEmpty()) continue;
                    printedSchemaUpdates.computeIfAbsent(optional.get().clientId, $ -> new ArrayList()).add(optional.get().ddl);
                    optional.get().startupBlockReason.ifPresent(arg_0 -> ((MutableString)oneStartupBlockReason).set(arg_0));
                }
                catch (InterruptedException | ExecutionException e) {
                    logger.error("", (Throwable)e);
                    throw new RuntimeException(e);
                }
                futureIterator.remove();
                continue;
            }
            shouldNotify = false;
        }
        if (shouldNotify && this.acquireSchemaUpdateLock(printedSchemaUpdates)) {
            this.sendEmail(printedSchemaUpdates, !oneStartupBlockReason.getString().isEmpty());
            this.recordChangelog(printedSchemaUpdates);
        }
        if (!oneStartupBlockReason.getString().isEmpty()) {
            logger.error(oneStartupBlockReason.getString());
            throw new RuntimeException(oneStartupBlockReason.getString());
        }
    }

    private void sendEmail(Map<ClientId, List<String>> printedSchemaUpdates, boolean isBlocking) {
        if (printedSchemaUpdates.isEmpty()) {
            return;
        }
        printedSchemaUpdates.forEach((clientId, ddlList) -> {
            String blocking = isBlocking ? " - Blocking " : " ";
            String subject = "SchemaUpdate Request" + blocking + "on " + clientId.getName() + " from " + this.datarouterProperties.getEnvironment();
            StringBuilder allStatements = new StringBuilder();
            ddlList.forEach(ddl -> {
                StringBuilder stringBuilder2 = allStatements.append((String)ddl).append("\n\n");
            });
            logger.warn("Sending schema update email for client={}", (Object)clientId.getName());
            this.sendEmail(this.datarouterProperties.getAdministratorEmail(), this.adminEmailService.getAdministratorEmailAddressesCsv(), subject, allStatements.toString());
        });
    }

    protected abstract void sendEmail(String var1, String var2, String var3, String var4);

    private Supplier<List<String>> lazyFetchExistingTables(ClientId clientId) {
        return SingletonSupplier.of(() -> this.fetchExistingTables(clientId));
    }

    protected abstract List<String> fetchExistingTables(ClientId var1);

    private boolean acquireSchemaUpdateLock(Map<ClientId, List<String>> printedSchemaUpdates) {
        if (printedSchemaUpdates.isEmpty()) {
            return false;
        }
        String statement = printedSchemaUpdates.entrySet().stream().findFirst().map(entry -> String.join((CharSequence)"\n\n", (Iterable)entry.getValue())).get();
        Instant now = Instant.now();
        Integer build = Optional.ofNullable(this.buildId).filter(buildId -> !"${env.BUILD_NUMBER}".equals(buildId)).map(Integer::valueOf).orElseGet(() -> (int)now.getEpochSecond());
        ClusterSchemaUpdateLock lock = new ClusterSchemaUpdateLock(build, statement, this.datarouterProperties.getServerName(), now);
        try {
            ((DatarouterClusterSchemaUpdateLockDao)this.schemaUpdateLockDao.get()).putAndAcquire(lock);
            logger.warn("Acquired schema update lock for hash={}", (Object)((ClusterSchemaUpdateLockKey)lock.getKey()).getStatementHash());
            return true;
        }
        catch (Exception ex) {
            logger.warn("Didn't acquire schema update lock for hash={}", (Object)((ClusterSchemaUpdateLockKey)lock.getKey()).getStatementHash());
            return false;
        }
    }

    private void recordChangelog(Map<ClientId, List<String>> printedSchemaUpdates) {
        printedSchemaUpdates.forEach((clientId, ddlList) -> {
            StringBuilder allStatements = new StringBuilder();
            ddlList.forEach(ddl -> {
                StringBuilder stringBuilder2 = allStatements.append((String)ddl).append("\n\n");
            });
            ((ChangelogRecorder)this.changelogRecorder.get()).record("SchemaUpdate", "clientId: " + clientId.getName(), "SchemaUpdate request", this.datarouterProperties.getAdministratorEmail(), allStatements.toString());
        });
    }
}

