package io.streamthoughts.jikkou.kafka.connect.control;

import io.streamthoughts.jikkou.annotation.AcceptsConfigProperty;
import io.streamthoughts.jikkou.annotation.AcceptsResource;
import io.streamthoughts.jikkou.api.config.ConfigProperty;
import io.streamthoughts.jikkou.api.config.Configuration;
import io.streamthoughts.jikkou.api.control.ResourceCollector;
import io.streamthoughts.jikkou.api.error.ConfigException;
import io.streamthoughts.jikkou.api.error.JikkouRuntimeException;
import io.streamthoughts.jikkou.api.model.ObjectMeta;
import io.streamthoughts.jikkou.api.selector.ResourceSelector;
import io.streamthoughts.jikkou.kafka.connect.KafkaConnectConstants;
import io.streamthoughts.jikkou.kafka.connect.KafkaConnectExtensionConfig;
import io.streamthoughts.jikkou.kafka.connect.KafkaConnectLabels;
import io.streamthoughts.jikkou.kafka.connect.api.KafkaConnectApi;
import io.streamthoughts.jikkou.kafka.connect.api.KafkaConnectApiFactory;
import io.streamthoughts.jikkou.kafka.connect.internals.KafkaConnectUtils;
import io.streamthoughts.jikkou.kafka.connect.models.KafkaConnectorState;
import io.streamthoughts.jikkou.kafka.connect.models.V1KafkaConnector;
import io.streamthoughts.jikkou.kafka.connect.models.V1KafkaConnectorSpec;
import io.streamthoughts.jikkou.kafka.connect.models.V1KafkaConnectorStatus;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@AcceptsResource(type = V1KafkaConnector.class)
@AcceptsConfigProperty(name = Config.EXPAND_STATUS_CONFIG_NAME, description = Config.EXPAND_STATUS_CONFIG_DESCRIPTION, defaultValue = "false", type = Boolean.class, isRequired = false)
/* loaded from: input_file:io/streamthoughts/jikkou/kafka/connect/control/KafkaConnectorCollector.class */
public final class KafkaConnectorCollector implements ResourceCollector<V1KafkaConnector> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaConnectorCollector.class);
    private static final String DEFAULT_CONNECTOR_TASKS_MAX = "1";
    private KafkaConnectExtensionConfig configuration;

    /* loaded from: input_file:io/streamthoughts/jikkou/kafka/connect/control/KafkaConnectorCollector$Config.class */
    public static class Config {
        public static final String EXPAND_STATUS_CONFIG_NAME = "expand-status";
        public static final String EXPAND_STATUS_CONFIG_DESCRIPTION = "Retrieves additional information about the status of the connector and its tasks.";
        public ConfigProperty<Boolean> EXPEND_STATUS = ConfigProperty.ofBoolean(EXPAND_STATUS_CONFIG_NAME).description(EXPAND_STATUS_CONFIG_DESCRIPTION).orElse(false);
        private final Configuration configuration;

        public Config(@NotNull Configuration configuration) {
            this.configuration = (Configuration) Objects.requireNonNull(configuration, "configuration must not be null");
        }

        public boolean expandStatus() {
            return ((Boolean) this.EXPEND_STATUS.evaluate(this.configuration)).booleanValue();
        }
    }

    public void configure(@NotNull Configuration configuration) throws ConfigException {
        configure(new KafkaConnectExtensionConfig(configuration));
    }

    public void configure(@NotNull KafkaConnectExtensionConfig kafkaConnectExtensionConfig) throws ConfigException {
        this.configuration = kafkaConnectExtensionConfig;
    }

    public List<V1KafkaConnector> listAll(@NotNull Configuration configuration, @NotNull List<ResourceSelector> list) {
        boolean expandStatus = new Config(configuration).expandStatus();
        return (List) this.configuration.getClusters().stream().flatMap(str -> {
            return listAll(str, expandStatus).stream();
        }).collect(Collectors.toList());
    }

    public List<V1KafkaConnector> listAll(String str, boolean z) {
        LinkedList linkedList = new LinkedList();
        KafkaConnectApi create = KafkaConnectApiFactory.create(this.configuration.getConfigForCluster(str).orElseThrow(() -> {
            return new JikkouRuntimeException("Cannot list connectors. Unknown Kafka Connect cluster '" + str + "'");
        }));
        try {
            for (String str2 : create.listConnectors()) {
                try {
                    linkedList.add(getConnectorAsync(str, str2, create, z).get());
                } catch (Exception e) {
                    if (e instanceof InterruptedException) {
                        Thread.currentThread().interrupt();
                    }
                    LOG.error("Failed to get connector '{}' from connect cluster {}", new Object[]{str2, str, e});
                }
            }
            return linkedList;
        } finally {
            create.close();
        }
    }

    private static CompletableFuture<V1KafkaConnector> getConnectorAsync(String str, String str2, KafkaConnectApi kafkaConnectApi, boolean z) {
        return CompletableFuture.supplyAsync(() -> {
            return kafkaConnectApi.getConnectorConfig(str2);
        }).thenCombine((CompletionStage) CompletableFuture.supplyAsync(() -> {
            return kafkaConnectApi.getConnectorStatus(str2);
        }), (map, connectorStatusResponse) -> {
            return V1KafkaConnector.builder().withMetadata(ObjectMeta.builder().withName(str2).withLabel(KafkaConnectLabels.KAFKA_CONNECT_CLUSTER, str).build()).withSpec(V1KafkaConnectorSpec.builder().withConnectorClass((String) Optional.ofNullable(map.get(KafkaConnectConstants.CONNECTOR_CLASS_CONFIG)).map((v0) -> {
                return v0.toString();
            }).orElse(null)).withTasksMax(Integer.valueOf(Integer.parseInt((String) Optional.ofNullable(map.get(KafkaConnectConstants.CONNECTOR_TASKS_MAX_CONFIG)).map((v0) -> {
                return v0.toString();
            }).orElse(DEFAULT_CONNECTOR_TASKS_MAX)))).withConfig(KafkaConnectUtils.removeCommonConnectorConfig((Map<String, Object>) map)).withState(KafkaConnectorState.fromValue(connectorStatusResponse.connector().state())).build()).withStatus(z ? new V1KafkaConnectorStatus(connectorStatusResponse) : null).build();
        });
    }
}
