/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.connect.runtime;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.ConnectorFactory;
import org.apache.kafka.connect.runtime.ConnectorStatus;
import org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter;
import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.WorkerConnector;
import org.apache.kafka.connect.runtime.WorkerSinkTask;
import org.apache.kafka.connect.runtime.WorkerSourceTask;
import org.apache.kafka.connect.runtime.WorkerTask;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Worker {
    private static final Logger log = LoggerFactory.getLogger(Worker.class);
    private final ExecutorService executor;
    private final Time time;
    private final String workerId;
    private final ConnectorFactory connectorFactory;
    private final WorkerConfig config;
    private final Converter defaultKeyConverter;
    private final Converter defaultValueConverter;
    private final Converter internalKeyConverter;
    private final Converter internalValueConverter;
    private final OffsetBackingStore offsetBackingStore;
    private final Map<String, Object> producerProps;
    private HashMap<String, WorkerConnector> connectors = new HashMap();
    private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap();
    private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;

    public Worker(String workerId, Time time, ConnectorFactory connectorFactory, WorkerConfig config, OffsetBackingStore offsetBackingStore) {
        this.executor = Executors.newCachedThreadPool();
        this.workerId = workerId;
        this.time = time;
        this.connectorFactory = connectorFactory;
        this.config = config;
        this.defaultKeyConverter = config.getConfiguredInstance("key.converter", Converter.class);
        this.defaultKeyConverter.configure(config.originalsWithPrefix("key.converter."), true);
        this.defaultValueConverter = config.getConfiguredInstance("value.converter", Converter.class);
        this.defaultValueConverter.configure(config.originalsWithPrefix("value.converter."), false);
        this.internalKeyConverter = config.getConfiguredInstance("internal.key.converter", Converter.class);
        this.internalKeyConverter.configure(config.originalsWithPrefix("internal.key.converter."), true);
        this.internalValueConverter = config.getConfiguredInstance("internal.value.converter", Converter.class);
        this.internalValueConverter.configure(config.originalsWithPrefix("internal.value.converter."), false);
        this.offsetBackingStore = offsetBackingStore;
        this.offsetBackingStore.configure(config);
        this.producerProps = new HashMap<String, Object>();
        this.producerProps.put("bootstrap.servers", Utils.join(config.getList("bootstrap.servers"), ","));
        this.producerProps.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        this.producerProps.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        this.producerProps.put("request.timeout.ms", Integer.valueOf(Integer.MAX_VALUE).toString());
        this.producerProps.put("retries", Integer.valueOf(Integer.MAX_VALUE).toString());
        this.producerProps.put("max.block.ms", Long.valueOf(Long.MAX_VALUE).toString());
        this.producerProps.put("acks", "all");
        this.producerProps.put("max.in.flight.requests.per.connection", "1");
        this.producerProps.putAll(config.originalsWithPrefix("producer."));
    }

    public void start() {
        log.info("Worker starting");
        this.offsetBackingStore.start();
        this.sourceTaskOffsetCommitter = new SourceTaskOffsetCommitter(this.config);
        log.info("Worker started");
    }

    public void stop() {
        log.info("Worker stopping");
        long started = this.time.milliseconds();
        long limit = started + this.config.getLong("task.shutdown.graceful.timeout.ms");
        if (!this.connectors.isEmpty()) {
            log.warn("Shutting down connectors {} uncleanly; herder should have shut down connectors before the Worker is stopped", (Object)this.connectors.keySet());
            this.stopConnectors();
        }
        if (!this.tasks.isEmpty()) {
            log.warn("Shutting down tasks {} uncleanly; herder should have shut down tasks before the Worker is stopped", (Object)this.tasks.keySet());
            this.stopAndAwaitTasks();
        }
        long timeoutMs = limit - this.time.milliseconds();
        this.sourceTaskOffsetCommitter.close(timeoutMs);
        this.offsetBackingStore.stop();
        log.info("Worker stopped");
    }

    public boolean startConnector(String connName, Map<String, String> connProps, ConnectorContext ctx, ConnectorStatus.Listener statusListener, TargetState initialState) {
        WorkerConnector workerConnector;
        if (this.connectors.containsKey(connName)) {
            throw new ConnectException("Connector with name " + connName + " already exists");
        }
        try {
            ConnectorConfig connConfig = new ConnectorConfig(connProps);
            String connClass = connConfig.getString("connector.class");
            log.info("Creating connector {} of type {}", (Object)connName, (Object)connClass);
            Connector connector = this.connectorFactory.newConnector(connClass);
            workerConnector = new WorkerConnector(connName, connector, ctx, statusListener);
            log.info("Instantiated connector {} with version {} of type {}", connName, connector.version(), connector.getClass());
            workerConnector.initialize(connConfig);
            workerConnector.transitionTo(initialState);
        }
        catch (Throwable t) {
            log.error("Failed to start connector {}", (Object)connName, (Object)t);
            statusListener.onFailure(connName, t);
            return false;
        }
        this.connectors.put(connName, workerConnector);
        log.info("Finished creating connector {}", (Object)connName);
        return true;
    }

    public boolean isSinkConnector(String connName) {
        WorkerConnector workerConnector = this.connectors.get(connName);
        return workerConnector.isSinkConnector();
    }

    public List<Map<String, String>> connectorTaskConfigs(String connName, int maxTasks, List<String> sinkTopics) {
        log.trace("Reconfiguring connector tasks for {}", (Object)connName);
        WorkerConnector workerConnector = this.connectors.get(connName);
        if (workerConnector == null) {
            throw new ConnectException("Connector " + connName + " not found in this worker.");
        }
        Connector connector = workerConnector.connector();
        ArrayList<Map<String, String>> result = new ArrayList<Map<String, String>>();
        String taskClassName = connector.taskClass().getName();
        for (Map<String, String> taskProps : connector.taskConfigs(maxTasks)) {
            HashMap<String, String> taskConfig = new HashMap<String, String>(taskProps);
            taskConfig.put("task.class", taskClassName);
            if (sinkTopics != null) {
                taskConfig.put("topics", Utils.join(sinkTopics, ","));
            }
            result.add(taskConfig);
        }
        return result;
    }

    public void stopConnectors() {
        this.stopConnectors(new HashSet<String>(this.connectors.keySet()));
    }

    public Collection<String> stopConnectors(Collection<String> connectors) {
        ArrayList<String> stopped = new ArrayList<String>(connectors.size());
        for (String connector : connectors) {
            if (!this.stopConnector(connector)) continue;
            stopped.add(connector);
        }
        return stopped;
    }

    public boolean stopConnector(String connName) {
        log.info("Stopping connector {}", (Object)connName);
        WorkerConnector connector = this.connectors.get(connName);
        if (connector == null) {
            log.warn("Ignoring stop request for unowned connector {}", (Object)connName);
            return false;
        }
        connector.shutdown();
        this.connectors.remove(connName);
        log.info("Stopped connector {}", (Object)connName);
        return true;
    }

    public Set<String> connectorNames() {
        return this.connectors.keySet();
    }

    public boolean isRunning(String connName) {
        WorkerConnector connector = this.connectors.get(connName);
        return connector != null && connector.isRunning();
    }

    public boolean startTask(ConnectorTaskId id, Map<String, String> connProps, Map<String, String> taskProps, TaskStatus.Listener statusListener, TargetState initialState) {
        WorkerTask workerTask;
        log.info("Creating task {}", (Object)id);
        if (this.tasks.containsKey(id)) {
            throw new ConnectException("Task already exists in this worker: " + id);
        }
        try {
            ConnectorConfig connConfig = new ConnectorConfig(connProps);
            TaskConfig taskConfig = new TaskConfig(taskProps);
            Class<Task> taskClass = taskConfig.getClass("task.class").asSubclass(Task.class);
            Task task = this.connectorFactory.newTask(taskClass);
            log.info("Instantiated task {} with version {} of type {}", id, task.version(), taskClass.getName());
            Converter keyConverter = connConfig.getConfiguredInstance("key.converter", Converter.class);
            if (keyConverter != null) {
                keyConverter.configure(connConfig.originalsWithPrefix("key.converter."), true);
            } else {
                keyConverter = this.defaultKeyConverter;
            }
            Converter valueConverter = connConfig.getConfiguredInstance("value.converter", Converter.class);
            if (valueConverter != null) {
                valueConverter.configure(connConfig.originalsWithPrefix("value.converter."), false);
            } else {
                valueConverter = this.defaultValueConverter;
            }
            workerTask = this.buildWorkerTask(id, task, statusListener, initialState, keyConverter, valueConverter);
            workerTask.initialize(taskConfig);
        }
        catch (Throwable t) {
            log.error("Failed to start task {}", (Object)id, (Object)t);
            statusListener.onFailure(id, t);
            return false;
        }
        this.executor.submit(workerTask);
        if (workerTask instanceof WorkerSourceTask) {
            this.sourceTaskOffsetCommitter.schedule(id, (WorkerSourceTask)workerTask);
        }
        this.tasks.put(id, workerTask);
        return true;
    }

    private WorkerTask buildWorkerTask(ConnectorTaskId id, Task task, TaskStatus.Listener statusListener, TargetState initialState, Converter keyConverter, Converter valueConverter) {
        if (task instanceof SourceTask) {
            OffsetStorageReaderImpl offsetReader = new OffsetStorageReaderImpl(this.offsetBackingStore, id.connector(), this.internalKeyConverter, this.internalValueConverter);
            OffsetStorageWriter offsetWriter = new OffsetStorageWriter(this.offsetBackingStore, id.connector(), this.internalKeyConverter, this.internalValueConverter);
            KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>(this.producerProps);
            return new WorkerSourceTask(id, (SourceTask)task, statusListener, initialState, keyConverter, valueConverter, producer, offsetReader, offsetWriter, this.config, this.time);
        }
        if (task instanceof SinkTask) {
            return new WorkerSinkTask(id, (SinkTask)task, statusListener, initialState, this.config, keyConverter, valueConverter, this.time);
        }
        log.error("Tasks must be a subclass of either SourceTask or SinkTask", (Object)task);
        throw new ConnectException("Tasks must be a subclass of either SourceTask or SinkTask");
    }

    public boolean stopAndAwaitTask(ConnectorTaskId id) {
        return !this.stopAndAwaitTasks(Collections.singleton(id)).isEmpty();
    }

    public void stopAndAwaitTasks() {
        this.stopAndAwaitTasks(new HashSet<ConnectorTaskId>(this.tasks.keySet()));
    }

    public Collection<ConnectorTaskId> stopAndAwaitTasks(Collection<ConnectorTaskId> ids) {
        ArrayList<ConnectorTaskId> stoppable = new ArrayList<ConnectorTaskId>(ids.size());
        for (ConnectorTaskId taskId : ids) {
            WorkerTask task = this.tasks.get(taskId);
            if (task == null) {
                log.warn("Ignoring stop request for unowned task {}", (Object)taskId);
                continue;
            }
            this.stopTask(task);
            stoppable.add(taskId);
        }
        this.awaitStopTasks(stoppable);
        return stoppable;
    }

    private void stopTask(WorkerTask task) {
        log.info("Stopping task {}", (Object)task.id());
        if (task instanceof WorkerSourceTask) {
            this.sourceTaskOffsetCommitter.remove(task.id());
        }
        task.stop();
    }

    private void awaitStopTasks(Collection<ConnectorTaskId> ids) {
        long now = this.time.milliseconds();
        long deadline = now + this.config.getLong("task.shutdown.graceful.timeout.ms");
        for (ConnectorTaskId id : ids) {
            long remaining = Math.max(0L, deadline - this.time.milliseconds());
            this.awaitStopTask(this.tasks.get(id), remaining);
        }
    }

    private void awaitStopTask(WorkerTask task, long timeout) {
        if (!task.awaitStop(timeout)) {
            log.error("Graceful stop of task {} failed.", (Object)task.id());
            task.cancel();
        }
        this.tasks.remove(task.id());
    }

    public Set<ConnectorTaskId> taskIds() {
        return this.tasks.keySet();
    }

    public Converter getInternalKeyConverter() {
        return this.internalKeyConverter;
    }

    public Converter getInternalValueConverter() {
        return this.internalValueConverter;
    }

    public ConnectorFactory getConnectorFactory() {
        return this.connectorFactory;
    }

    public String workerId() {
        return this.workerId;
    }

    public void setTargetState(String connName, TargetState state) {
        log.info("Setting connector {} state to {}", (Object)connName, (Object)state);
        WorkerConnector connector = this.connectors.get(connName);
        if (connector != null) {
            connector.transitionTo(state);
        }
        for (Map.Entry<ConnectorTaskId, WorkerTask> taskEntry : this.tasks.entrySet()) {
            if (!taskEntry.getKey().connector().equals(connName)) continue;
            taskEntry.getValue().transitionTo(state);
        }
    }
}

