/*
 * Decompiled with CFR 0.152.
 */
package io.datarouter.storage;

import io.datarouter.storage.client.ClientId;
import io.datarouter.storage.client.SchemaUpdateResult;
import io.datarouter.storage.config.DatarouterAdministratorEmailService;
import io.datarouter.storage.config.DatarouterProperties;
import io.datarouter.storage.config.executor.DatarouterStorageExecutors;
import io.datarouter.storage.node.type.physical.PhysicalNode;
import io.datarouter.util.lazy.Lazy;
import io.datarouter.util.mutable.MutableString;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseSchemaUpdateService {
    private static final Logger logger = LoggerFactory.getLogger(BaseSchemaUpdateService.class);
    private static final long THROTTLING_DELAY_SECONDS = 10L;
    private final DatarouterProperties datarouterProperties;
    private final DatarouterAdministratorEmailService adminEmailService;
    private final DatarouterStorageExecutors.DatarouterSchemaUpdateScheduler executor;
    private final Map<ClientId, Lazy<List<String>>> existingTableNamesByClient;
    private final List<Future<Optional<SchemaUpdateResult>>> futures;

    public BaseSchemaUpdateService(DatarouterProperties datarouterProperties, DatarouterAdministratorEmailService adminEmailService, DatarouterStorageExecutors.DatarouterSchemaUpdateScheduler executor) {
        this.datarouterProperties = datarouterProperties;
        this.adminEmailService = adminEmailService;
        this.executor = executor;
        this.futures = Collections.synchronizedList(new ArrayList());
        this.existingTableNamesByClient = new ConcurrentHashMap<ClientId, Lazy<List<String>>>();
        executor.scheduleWithFixedDelay(this::gatherSchemaUpdates, 0L, 10L, TimeUnit.SECONDS);
    }

    public Future<Optional<SchemaUpdateResult>> queueNodeForSchemaUpdate(ClientId clientId, PhysicalNode<?, ?, ?> node) {
        Lazy existingTableNames = this.existingTableNamesByClient.computeIfAbsent(clientId, this::lazyFetchExistingTables);
        Future<Optional<SchemaUpdateResult>> future = this.executor.submit(this.makeSchemaUpdateCallable(clientId, (Lazy<List<String>>)existingTableNames, node));
        this.futures.add(future);
        return future;
    }

    protected abstract Callable<Optional<SchemaUpdateResult>> makeSchemaUpdateCallable(ClientId var1, Lazy<List<String>> var2, PhysicalNode<?, ?, ?> var3);

    private void gatherSchemaUpdates() {
        this.gatherSchemaUpdates(false);
    }

    public synchronized void gatherSchemaUpdates(boolean wait) {
        boolean shouldNotify = true;
        HashMap<ClientId, List<String>> printedSchemaUpdates = new HashMap<ClientId, List<String>>();
        Iterator<Future<Optional<SchemaUpdateResult>>> futureIterator = this.futures.iterator();
        MutableString oneStartupBlockReason = new MutableString("");
        while (futureIterator.hasNext()) {
            Future<Optional<SchemaUpdateResult>> future = futureIterator.next();
            if (wait || future.isDone()) {
                try {
                    Optional<SchemaUpdateResult> optional = future.get();
                    if (optional.isEmpty()) continue;
                    printedSchemaUpdates.computeIfAbsent(optional.get().clientId, $ -> new ArrayList()).add(optional.get().ddl);
                    optional.get().startupBlockReason.ifPresent(arg_0 -> ((MutableString)oneStartupBlockReason).set(arg_0));
                }
                catch (InterruptedException | ExecutionException e) {
                    logger.error("", (Throwable)e);
                    throw new RuntimeException(e);
                }
                futureIterator.remove();
                continue;
            }
            shouldNotify = false;
        }
        if (shouldNotify) {
            this.sendEmail(printedSchemaUpdates);
        }
        if (!oneStartupBlockReason.getString().isEmpty()) {
            logger.error(oneStartupBlockReason.getString());
            throw new RuntimeException(oneStartupBlockReason.getString());
        }
    }

    private void sendEmail(Map<ClientId, List<String>> printedSchemaUpdates) {
        if (printedSchemaUpdates.isEmpty()) {
            return;
        }
        for (Map.Entry<ClientId, List<String>> clientAndDdls : printedSchemaUpdates.entrySet()) {
            String subject = "SchemaUpdate request on " + clientAndDdls.getKey().getName() + " from " + this.datarouterProperties.getEnvironment();
            StringBuilder body = new StringBuilder();
            for (String update : clientAndDdls.getValue()) {
                body.append(String.valueOf(update) + "\n\n");
            }
            this.sendEmail(this.datarouterProperties.getAdministratorEmail(), this.adminEmailService.getAdministratorEmailAddressesCsv(), subject, body.toString());
        }
    }

    protected abstract void sendEmail(String var1, String var2, String var3, String var4);

    private Lazy<List<String>> lazyFetchExistingTables(ClientId clientId) {
        return Lazy.of(() -> this.fetchExistingTables(clientId));
    }

    protected abstract List<String> fetchExistingTables(ClientId var1);
}

