package io.streamthoughts.jikkou.kafka.connect.control;

import io.streamthoughts.jikkou.annotation.AcceptsReconciliationModes;
import io.streamthoughts.jikkou.annotation.AcceptsResource;
import io.streamthoughts.jikkou.api.ReconciliationContext;
import io.streamthoughts.jikkou.api.ReconciliationMode;
import io.streamthoughts.jikkou.api.change.ChangeExecutor;
import io.streamthoughts.jikkou.api.change.ChangeHandler;
import io.streamthoughts.jikkou.api.change.ChangeResult;
import io.streamthoughts.jikkou.api.config.Configuration;
import io.streamthoughts.jikkou.api.control.BaseResourceController;
import io.streamthoughts.jikkou.api.error.ConfigException;
import io.streamthoughts.jikkou.api.model.GenericResourceListObject;
import io.streamthoughts.jikkou.api.model.HasMetadata;
import io.streamthoughts.jikkou.api.model.HasMetadataChange;
import io.streamthoughts.jikkou.api.model.ResourceListObject;
import io.streamthoughts.jikkou.api.selector.AggregateSelector;
import io.streamthoughts.jikkou.kafka.connect.KafkaConnectExtensionConfig;
import io.streamthoughts.jikkou.kafka.connect.KafkaConnectLabels;
import io.streamthoughts.jikkou.kafka.connect.api.KafkaConnectApi;
import io.streamthoughts.jikkou.kafka.connect.api.KafkaConnectApiFactory;
import io.streamthoughts.jikkou.kafka.connect.change.KafkaConnectorChange;
import io.streamthoughts.jikkou.kafka.connect.change.KafkaConnectorChangeComputer;
import io.streamthoughts.jikkou.kafka.connect.change.KafkaConnectorChangeDescription;
import io.streamthoughts.jikkou.kafka.connect.change.KafkaConnectorChangeHandler;
import io.streamthoughts.jikkou.kafka.connect.models.V1KafkaConnector;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jetbrains.annotations.NotNull;

@AcceptsResource(type = V1KafkaConnector.class)
@AcceptsReconciliationModes({ReconciliationMode.CREATE, ReconciliationMode.DELETE, ReconciliationMode.UPDATE, ReconciliationMode.APPLY_ALL})
/* loaded from: input_file:io/streamthoughts/jikkou/kafka/connect/control/KafkaConnectorController.class */
public final class KafkaConnectorController implements BaseResourceController<V1KafkaConnector, KafkaConnectorChange> {
    private KafkaConnectExtensionConfig configuration;
    private KafkaConnectorCollector collector;

    public void configure(@NotNull Configuration configuration) throws ConfigException {
        this.configuration = new KafkaConnectExtensionConfig(configuration);
        this.collector = new KafkaConnectorCollector();
        this.collector.configure(configuration);
    }

    public List<ChangeResult<KafkaConnectorChange>> execute(@NotNull List<HasMetadataChange<KafkaConnectorChange>> list, @NotNull ReconciliationMode reconciliationMode, boolean z) {
        Map groupByKafkaConnectCluster = groupByKafkaConnectCluster(list, hasMetadataChange -> {
            return true;
        });
        LinkedList linkedList = new LinkedList();
        for (Map.Entry entry : groupByKafkaConnectCluster.entrySet()) {
            String str = (String) entry.getKey();
            KafkaConnectApi create = KafkaConnectApiFactory.create(this.configuration.getConfigForCluster(str).get());
            try {
                linkedList.addAll(new ChangeExecutor(List.of(new KafkaConnectorChangeHandler(create, str), new ChangeHandler.None(hasMetadataChange2 -> {
                    return new KafkaConnectorChangeDescription(str, (KafkaConnectorChange) hasMetadataChange2.getChange());
                }))).execute((List) entry.getValue(), z));
                if (create != null) {
                    create.close();
                }
            } catch (Throwable th) {
                if (create != null) {
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
        return linkedList;
    }

    public ResourceListObject<HasMetadataChange<KafkaConnectorChange>> computeReconciliationChanges(@NotNull Collection<V1KafkaConnector> collection, @NotNull ReconciliationMode reconciliationMode, @NotNull ReconciliationContext reconciliationContext) {
        AggregateSelector aggregateSelector = new AggregateSelector(reconciliationContext.selectors());
        Map groupByKafkaConnectCluster = groupByKafkaConnectCluster(collection, (v1) -> {
            return r2.apply(v1);
        });
        KafkaConnectorChangeComputer kafkaConnectorChangeComputer = new KafkaConnectorChangeComputer();
        LinkedList linkedList = new LinkedList();
        for (Map.Entry entry : groupByKafkaConnectCluster.entrySet()) {
            String str = (String) entry.getKey();
            List list = (List) entry.getValue();
            Stream<V1KafkaConnector> stream = this.collector.listAll(str, false).stream();
            AggregateSelector aggregateSelector2 = new AggregateSelector(reconciliationContext.selectors());
            linkedList.addAll(kafkaConnectorChangeComputer.computeChanges(stream.filter((v1) -> {
                return r1.apply(v1);
            }).toList(), list));
        }
        return GenericResourceListObject.builder().withItems(linkedList).build();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @NotNull
    private <T extends HasMetadata> Map<String, List<T>> groupByKafkaConnectCluster(@NotNull Collection<T> collection, @NotNull Predicate<T> predicate) {
        return (Map) collection.stream().filter(predicate).collect(Collectors.groupingBy(hasMetadata -> {
            return hasMetadata.getMetadata().getLabelByKey(KafkaConnectLabels.KAFKA_CONNECT_CLUSTER).toString();
        }, Collectors.toList()));
    }
}
