/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.runtime.standalone;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
import org.apache.kafka.connect.errors.AlreadyExistsException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.AbstractHerder;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.HerderConnectorContext;
import org.apache.kafka.connect.runtime.HerderRequest;
import org.apache.kafka.connect.runtime.RestartPlan;
import org.apache.kafka.connect.runtime.RestartRequest;
import org.apache.kafka.connect.runtime.SessionKey;
import org.apache.kafka.connect.runtime.SinkConnectorConfig;
import org.apache.kafka.connect.runtime.SourceConnectorConfig;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
import org.apache.kafka.connect.runtime.rest.InternalRequestSignature;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
import org.apache.kafka.connect.storage.ConfigBackingStore;
import org.apache.kafka.connect.storage.MemoryConfigBackingStore;
import org.apache.kafka.connect.storage.MemoryStatusBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StandaloneHerder
extends AbstractHerder {
    private static final Logger log = LoggerFactory.getLogger(StandaloneHerder.class);
    private final AtomicLong requestSeqNum = new AtomicLong();
    private final ScheduledExecutorService requestExecutorService;
    private ClusterConfigState configState = ClusterConfigState.EMPTY;

    public StandaloneHerder(Worker worker, String kafkaClusterId, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
        this(worker, worker.workerId(), kafkaClusterId, (StatusBackingStore)new MemoryStatusBackingStore(), new MemoryConfigBackingStore(worker.configTransformer()), connectorClientConfigOverridePolicy);
    }

    StandaloneHerder(Worker worker, String workerId, String kafkaClusterId, StatusBackingStore statusBackingStore, MemoryConfigBackingStore configBackingStore, ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
        super(worker, workerId, kafkaClusterId, statusBackingStore, configBackingStore, connectorClientConfigOverridePolicy);
        this.requestExecutorService = Executors.newSingleThreadScheduledExecutor();
        configBackingStore.setUpdateListener(new ConfigUpdateListener());
    }

    @Override
    public synchronized void start() {
        log.info("Herder starting");
        this.startServices();
        this.running = true;
        log.info("Herder started");
    }

    @Override
    public synchronized void stop() {
        log.info("Herder stopping");
        this.requestExecutorService.shutdown();
        try {
            if (!this.requestExecutorService.awaitTermination(30L, TimeUnit.SECONDS)) {
                this.requestExecutorService.shutdownNow();
            }
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        for (String connName : this.connectors()) {
            this.removeConnectorTasks(connName);
            this.worker.stopAndAwaitConnector(connName);
        }
        this.stopServices();
        this.running = false;
        log.info("Herder stopped");
    }

    @Override
    public int generation() {
        return 0;
    }

    @Override
    public synchronized void connectors(Callback<Collection<String>> callback) {
        callback.onCompletion(null, this.connectors());
    }

    @Override
    public synchronized void connectorInfo(String connName, Callback<ConnectorInfo> callback) {
        ConnectorInfo connectorInfo = this.connectorInfo(connName);
        if (connectorInfo == null) {
            callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null);
            return;
        }
        callback.onCompletion(null, connectorInfo);
    }

    private synchronized ConnectorInfo createConnectorInfo(String connector) {
        if (!this.configState.contains(connector)) {
            return null;
        }
        Map<String, String> config = this.configState.rawConnectorConfig(connector);
        return new ConnectorInfo(connector, config, this.configState.tasks(connector), this.connectorTypeForClass(config.get("connector.class")));
    }

    @Override
    protected synchronized Map<String, String> rawConfig(String connName) {
        return this.configState.rawConnectorConfig(connName);
    }

    @Override
    public synchronized void deleteConnectorConfig(String connName, Callback<Herder.Created<ConnectorInfo>> callback) {
        try {
            if (!this.configState.contains(connName)) {
                callback.onCompletion(new NotFoundException("Connector " + connName + " not found", null), null);
                return;
            }
            this.removeConnectorTasks(connName);
            this.worker.stopAndAwaitConnector(connName);
            this.configBackingStore.removeConnectorConfig(connName);
            this.onDeletion(connName);
            callback.onCompletion(null, new Herder.Created<Object>(false, null));
        }
        catch (ConnectException e) {
            callback.onCompletion(e, null);
        }
    }

    @Override
    public synchronized void putConnectorConfig(String connName, Map<String, String> config, boolean allowReplace, Callback<Herder.Created<ConnectorInfo>> callback) {
        try {
            this.validateConnectorConfig(config, (error, configInfos) -> {
                if (error != null) {
                    callback.onCompletion(error, null);
                    return;
                }
                this.requestExecutorService.submit(() -> this.putConnectorConfig(connName, config, allowReplace, callback, (ConfigInfos)configInfos));
            });
        }
        catch (Throwable t) {
            callback.onCompletion(t, null);
        }
    }

    private synchronized void putConnectorConfig(String connName, Map<String, String> config, boolean allowReplace, Callback<Herder.Created<ConnectorInfo>> callback, ConfigInfos configInfos) {
        try {
            boolean created;
            if (this.maybeAddConfigErrors(configInfos, callback)) {
                return;
            }
            if (this.configState.contains(connName)) {
                if (!allowReplace) {
                    callback.onCompletion(new AlreadyExistsException("Connector " + connName + " already exists"), null);
                    return;
                }
                this.worker.stopAndAwaitConnector(connName);
                created = false;
            } else {
                created = true;
            }
            this.configBackingStore.putConnectorConfig(connName, config);
            this.startConnector(connName, (error, result) -> {
                if (error != null) {
                    callback.onCompletion(error, null);
                    return;
                }
                this.requestExecutorService.submit(() -> {
                    this.updateConnectorTasks(connName);
                    callback.onCompletion(null, new Herder.Created<ConnectorInfo>(created, this.createConnectorInfo(connName)));
                });
            });
        }
        catch (Throwable t) {
            callback.onCompletion(t, null);
        }
    }

    @Override
    public synchronized void requestTaskReconfiguration(String connName) {
        if (!this.worker.connectorNames().contains(connName)) {
            log.error("Task that requested reconfiguration does not exist: {}", (Object)connName);
            return;
        }
        this.updateConnectorTasks(connName);
    }

    @Override
    public synchronized void taskConfigs(String connName, Callback<List<TaskInfo>> callback) {
        if (!this.configState.contains(connName)) {
            callback.onCompletion(new NotFoundException("Connector " + connName + " not found", null), null);
            return;
        }
        ArrayList<TaskInfo> result = new ArrayList<TaskInfo>();
        for (ConnectorTaskId taskId : this.configState.tasks(connName)) {
            result.add(new TaskInfo(taskId, this.configState.rawTaskConfig(taskId)));
        }
        callback.onCompletion(null, result);
    }

    @Override
    public void putTaskConfigs(String connName, List<Map<String, String>> configs, Callback<Void> callback, InternalRequestSignature requestSignature) {
        throw new UnsupportedOperationException("Kafka Connect in standalone mode does not support externally setting task configurations.");
    }

    @Override
    public synchronized void restartTask(ConnectorTaskId taskId, Callback<Void> cb) {
        Map<String, String> taskConfigProps;
        if (!this.configState.contains(taskId.connector())) {
            cb.onCompletion(new NotFoundException("Connector " + taskId.connector() + " not found", null), null);
        }
        if ((taskConfigProps = this.configState.taskConfig(taskId)) == null) {
            cb.onCompletion(new NotFoundException("Task " + taskId + " not found", null), null);
        }
        Map<String, String> connConfigProps = this.configState.connectorConfig(taskId.connector());
        TargetState targetState = this.configState.targetState(taskId.connector());
        this.worker.stopAndAwaitTask(taskId);
        if (this.worker.startTask(taskId, this.configState, connConfigProps, taskConfigProps, this, targetState)) {
            cb.onCompletion(null, null);
        } else {
            cb.onCompletion(new ConnectException("Failed to start task: " + taskId), null);
        }
    }

    @Override
    public synchronized void restartConnector(String connName, Callback<Void> cb) {
        if (!this.configState.contains(connName)) {
            cb.onCompletion(new NotFoundException("Connector " + connName + " not found", null), null);
        }
        this.worker.stopAndAwaitConnector(connName);
        this.startConnector(connName, (error, result) -> cb.onCompletion(error, null));
    }

    @Override
    public synchronized HerderRequest restartConnector(long delayMs, String connName, Callback<Void> cb) {
        ScheduledFuture<?> future = this.requestExecutorService.schedule(() -> this.restartConnector(connName, cb), delayMs, TimeUnit.MILLISECONDS);
        return new StandaloneHerderRequest(this.requestSeqNum.incrementAndGet(), future);
    }

    @Override
    public synchronized void restartConnectorAndTasks(RestartRequest request, Callback<ConnectorStateInfo> cb) {
        String connectorName = request.connectorName();
        if (!this.configState.contains(connectorName)) {
            cb.onCompletion(new NotFoundException("Unknown connector: " + connectorName, null), null);
            return;
        }
        Optional<RestartPlan> maybePlan = this.buildRestartPlan(request);
        if (!maybePlan.isPresent()) {
            cb.onCompletion(new NotFoundException("Status for connector " + connectorName + " not found", null), null);
            return;
        }
        RestartPlan plan = maybePlan.get();
        log.info("Received {}", (Object)plan);
        if (plan.shouldRestartConnector()) {
            this.worker.stopAndAwaitConnector(connectorName);
            this.onRestart(connectorName);
        }
        if (plan.shouldRestartTasks()) {
            this.worker.stopAndAwaitTasks(plan.taskIdsToRestart());
            plan.taskIdsToRestart().forEach(this::onRestart);
        }
        if (plan.shouldRestartConnector()) {
            log.debug("Restarting connector '{}'", (Object)connectorName);
            this.startConnector(connectorName, (error, targetState) -> {
                if (error == null) {
                    log.info("Connector '{}' restart successful", (Object)connectorName);
                } else {
                    log.error("Connector '{}' restart failed", (Object)connectorName, (Object)error);
                }
            });
        }
        if (plan.shouldRestartTasks()) {
            log.debug("Restarting {} of {} tasks for {}", plan.restartTaskCount(), plan.totalTaskCount(), request);
            this.createConnectorTasks(connectorName, plan.taskIdsToRestart());
            log.debug("Restarted {} of {} tasks for {} as requested", plan.restartTaskCount(), plan.totalTaskCount(), request);
        }
        log.info("Completed {}", (Object)plan);
        cb.onCompletion(null, plan.restartConnectorStateInfo());
    }

    private void startConnector(String connName, Callback<TargetState> onStart) {
        Map<String, String> connConfigs = this.configState.connectorConfig(connName);
        TargetState targetState = this.configState.targetState(connName);
        this.worker.startConnector(connName, connConfigs, new HerderConnectorContext(this, connName), this, targetState, onStart);
    }

    private List<Map<String, String>> recomputeTaskConfigs(String connName) {
        Map<String, String> config = this.configState.connectorConfig(connName);
        ConnectorConfig connConfig = this.worker.isSinkConnector(connName) ? new SinkConnectorConfig(this.plugins(), config) : new SourceConnectorConfig(this.plugins(), config, this.worker.isTopicCreationEnabled());
        return this.worker.connectorTaskConfigs(connName, connConfig);
    }

    private void createConnectorTasks(String connName) {
        List<ConnectorTaskId> taskIds = this.configState.tasks(connName);
        this.createConnectorTasks(connName, taskIds);
    }

    private void createConnectorTasks(String connName, Collection<ConnectorTaskId> taskIds) {
        TargetState initialState = this.configState.targetState(connName);
        Map<String, String> connConfigs = this.configState.connectorConfig(connName);
        for (ConnectorTaskId taskId : taskIds) {
            Map<String, String> taskConfigMap = this.configState.taskConfig(taskId);
            this.worker.startTask(taskId, this.configState, connConfigs, taskConfigMap, this, initialState);
        }
    }

    private void removeConnectorTasks(String connName) {
        List<ConnectorTaskId> tasks = this.configState.tasks(connName);
        if (!tasks.isEmpty()) {
            this.worker.stopAndAwaitTasks(tasks);
            this.configBackingStore.removeTaskConfigs(connName);
            tasks.forEach(this::onDeletion);
        }
    }

    private void updateConnectorTasks(String connName) {
        List<Map<String, String>> oldTaskConfigs;
        if (!this.worker.isRunning(connName)) {
            log.info("Skipping update of connector {} since it is not running", (Object)connName);
            return;
        }
        List<Map<String, String>> newTaskConfigs = this.recomputeTaskConfigs(connName);
        if (!newTaskConfigs.equals(oldTaskConfigs = this.configState.allTaskConfigs(connName))) {
            this.removeConnectorTasks(connName);
            List<Map<String, String>> rawTaskConfigs = StandaloneHerder.reverseTransform(connName, this.configState, newTaskConfigs);
            this.configBackingStore.putTaskConfigs(connName, rawTaskConfigs);
            this.createConnectorTasks(connName);
        }
    }

    @Override
    public void tasksConfig(String connName, Callback<Map<ConnectorTaskId, Map<String, String>>> callback) {
        Map<ConnectorTaskId, Map<String, String>> tasksConfig = this.buildTasksConfig(connName);
        if (tasksConfig.isEmpty()) {
            callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), tasksConfig);
            return;
        }
        callback.onCompletion(null, tasksConfig);
    }

    static class StandaloneHerderRequest
    implements HerderRequest {
        private final long seq;
        private final ScheduledFuture<?> future;

        public StandaloneHerderRequest(long seq, ScheduledFuture<?> future) {
            this.seq = seq;
            this.future = future;
        }

        @Override
        public void cancel() {
            this.future.cancel(false);
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (!(o instanceof StandaloneHerderRequest)) {
                return false;
            }
            StandaloneHerderRequest other = (StandaloneHerderRequest)o;
            return this.seq == other.seq;
        }

        public int hashCode() {
            return Objects.hash(this.seq);
        }
    }

    private class ConfigUpdateListener
    implements ConfigBackingStore.UpdateListener {
        private ConfigUpdateListener() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onConnectorConfigRemove(String connector) {
            StandaloneHerder standaloneHerder = StandaloneHerder.this;
            synchronized (standaloneHerder) {
                StandaloneHerder.this.configState = StandaloneHerder.this.configBackingStore.snapshot();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onConnectorConfigUpdate(String connector) {
            StandaloneHerder standaloneHerder = StandaloneHerder.this;
            synchronized (standaloneHerder) {
                StandaloneHerder.this.configState = StandaloneHerder.this.configBackingStore.snapshot();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onTaskConfigUpdate(Collection<ConnectorTaskId> tasks) {
            StandaloneHerder standaloneHerder = StandaloneHerder.this;
            synchronized (standaloneHerder) {
                StandaloneHerder.this.configState = StandaloneHerder.this.configBackingStore.snapshot();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void onConnectorTargetStateChange(String connector) {
            StandaloneHerder standaloneHerder = StandaloneHerder.this;
            synchronized (standaloneHerder) {
                StandaloneHerder.this.configState = StandaloneHerder.this.configBackingStore.snapshot();
                TargetState targetState = StandaloneHerder.this.configState.targetState(connector);
                StandaloneHerder.this.worker.setTargetState(connector, targetState, (error, newState) -> {
                    if (error != null) {
                        log.error("Failed to transition connector {} to target state {}", new Object[]{connector, targetState, error});
                        return;
                    }
                    if (newState == TargetState.STARTED) {
                        StandaloneHerder.this.requestExecutorService.submit(() -> StandaloneHerder.this.updateConnectorTasks(connector));
                    }
                });
            }
        }

        @Override
        public void onSessionKeyUpdate(SessionKey sessionKey) {
        }

        @Override
        public void onRestartRequest(RestartRequest restartRequest) {
        }
    }
}

