/*
 * Decompiled with CFR 0.152.
 */
package io.quarkiverse.reactive.messaging.nats.jetstream.client.administration;

import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamManagement;
import io.nats.client.KeyValueManagement;
import io.nats.client.api.ConsumerInfo;
import io.nats.client.api.KeyValueConfiguration;
import io.nats.client.api.PurgeResponse;
import io.nats.client.api.StreamConfiguration;
import io.nats.client.api.StreamInfo;
import io.nats.client.api.StreamInfoOptions;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.AbstractConnection;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.AdministrationException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionListener;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.administration.DeleteException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.administration.JetStreamSetupException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.administration.PurgeResult;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.administration.SetupResult;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.administration.StreamState;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.KeyValueSetupConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.SetupConfiguration;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.unchecked.Unchecked;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import org.jboss.logging.Logger;

public class AdministrationConnection
extends AbstractConnection
implements io.quarkiverse.reactive.messaging.nats.jetstream.client.AdministrationConnection {
    private static final Logger logger = Logger.getLogger(AdministrationConnection.class);

    public AdministrationConnection(ConnectionConfiguration connectionConfiguration, ConnectionListener connectionListener) {
        super(connectionConfiguration, connectionListener);
    }

    @Override
    public Uni<ConsumerInfo> getConsumerInfo(String stream, String consumerName) {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                JetStreamManagement jsm = this.connection.jetStreamManagement();
                return jsm.getConsumerInfo(stream, consumerName);
            }
            catch (JetStreamApiException | IOException e) {
                throw new AdministrationException(e);
            }
        }));
    }

    @Override
    public Uni<List<String>> getStreams() {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                JetStreamManagement jsm = this.connection.jetStreamManagement();
                return jsm.getStreamNames();
            }
            catch (JetStreamApiException | IOException e) {
                throw new AdministrationException(e);
            }
        }));
    }

    @Override
    public Uni<List<String>> getSubjects(String streamName) {
        return Uni.createFrom().item(Unchecked.supplier(() -> this.getStreamInfo(streamName).map(streamInfo -> streamInfo.getConfiguration().getSubjects()).orElseGet(List::of)));
    }

    @Override
    public Uni<List<String>> getConsumerNames(String streamName) {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                JetStreamManagement jsm = this.connection.jetStreamManagement();
                return jsm.getConsumerNames(streamName);
            }
            catch (JetStreamApiException | IOException e) {
                throw new AdministrationException(e);
            }
        }));
    }

    @Override
    public Uni<PurgeResult> purgeStream(String streamName) {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                JetStreamManagement jsm = this.connection.jetStreamManagement();
                PurgeResponse response = jsm.purgeStream(streamName);
                return new PurgeResult(streamName, response.isSuccess(), response.getPurged());
            }
            catch (JetStreamApiException | IOException e) {
                throw new AdministrationException(e);
            }
        }));
    }

    @Override
    public Uni<Void> deleteMessage(String stream, long sequence, boolean erase) {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                JetStreamManagement jsm = this.connection.jetStreamManagement();
                if (!jsm.deleteMessage(stream, sequence, erase)) {
                    throw new DeleteException(String.format("Unable to delete message in stream %s with sequence %d", stream, sequence));
                }
                return null;
            }
            catch (JetStreamApiException | IOException e) {
                throw new DeleteException(String.format("Unable to delete message in stream %s with sequence %d: %s", stream, sequence, e.getMessage()), e);
            }
        }));
    }

    @Override
    public Uni<StreamState> getStreamState(String streamName) {
        return Uni.createFrom().item(Unchecked.supplier(() -> this.getStreamInfo(streamName).map(streamInfo -> StreamState.of(streamInfo.getStreamState())).orElseThrow(() -> new AdministrationException("Stream state not found"))));
    }

    @Override
    public Uni<List<PurgeResult>> purgeAllStreams() {
        return this.getStreams().onItem().transformToUni(this::purgeAllStreams);
    }

    @Override
    public Uni<SetupResult> addOrUpdateStream(SetupConfiguration setupConfiguration) {
        return this.getJetStreamManagement().onItem().transformToUni(jetStreamManagement -> this.addOrUpdateStream((JetStreamManagement)jetStreamManagement, setupConfiguration));
    }

    @Override
    public Uni<Void> addOrUpdateKeyValueStore(KeyValueSetupConfiguration keyValueSetupConfiguration) {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                KeyValueManagement kvm = this.connection.keyValueManagement();
                if (kvm.getBucketNames().contains(keyValueSetupConfiguration.bucketName())) {
                    kvm.update(this.createKeyValueConfiguration(keyValueSetupConfiguration));
                } else {
                    kvm.create(this.createKeyValueConfiguration(keyValueSetupConfiguration));
                }
                return null;
            }
            catch (JetStreamApiException | IOException e) {
                throw new JetStreamSetupException(String.format("Unable to manage Key Value Store: %s", e.getMessage()), e);
            }
        }));
    }

    private Uni<JetStreamManagement> getJetStreamManagement() {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                return this.connection.jetStreamManagement();
            }
            catch (IOException e) {
                throw new AdministrationException(String.format("Unable to manage JetStream: %s", e.getMessage()), e);
            }
        }));
    }

    private Uni<SetupResult> addOrUpdateStream(JetStreamManagement jsm, SetupConfiguration setupConfiguration) {
        return this.getStreamInfo(jsm, setupConfiguration.stream()).onItem().transformToUni(streamInfo -> this.updateStream(jsm, (StreamInfo)streamInfo, setupConfiguration)).onFailure().recoverWithUni(failure -> this.createStream(jsm, setupConfiguration));
    }

    private Optional<StreamInfo> getStreamInfo(String streamName) {
        try {
            JetStreamManagement jsm = this.connection.jetStreamManagement();
            return Optional.of(jsm.getStreamInfo(streamName, StreamInfoOptions.allSubjects()));
        }
        catch (JetStreamApiException | IOException e) {
            logger.debugf(e, "Unable to read stream %s with message: %s", (Object)streamName, (Object)e.getMessage());
            return Optional.empty();
        }
    }

    private Uni<StreamInfo> getStreamInfo(JetStreamManagement jsm, String streamName) {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                return jsm.getStreamInfo(streamName, StreamInfoOptions.allSubjects());
            }
            catch (JetStreamApiException | IOException e) {
                throw new AdministrationException(String.format("Unable to read stream %s with message: %s", streamName, e.getMessage()), e);
            }
        }));
    }

    private Optional<PurgeResult> purgeStream(JetStreamManagement jetStreamManagement, String streamName) {
        try {
            PurgeResponse response = jetStreamManagement.purgeStream(streamName);
            return Optional.of(new PurgeResult(streamName, response.isSuccess(), response.getPurged()));
        }
        catch (JetStreamApiException | IOException e) {
            logger.warnf(e, "Unable to purge stream %s with message: %s", (Object)streamName, (Object)e.getMessage());
            return Optional.empty();
        }
    }

    private Uni<List<PurgeResult>> purgeAllStreams(List<String> streams) {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                JetStreamManagement jsm = this.connection.jetStreamManagement();
                return streams.stream().flatMap(streamName -> this.purgeStream(jsm, (String)streamName).stream()).toList();
            }
            catch (IOException e) {
                throw new AdministrationException(e);
            }
        }));
    }

    private Uni<SetupResult> updateStream(JetStreamManagement jsm, StreamInfo streamInfo, SetupConfiguration setupConfiguration) {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                StreamConfiguration configuration = streamInfo.getConfiguration();
                HashSet currentSubjects = new HashSet(configuration.getSubjects());
                if (!currentSubjects.containsAll(setupConfiguration.subjects())) {
                    StreamConfiguration streamConfiguration = streamInfo.getConfiguration();
                    HashSet<String> newSubjects = new HashSet<String>(streamConfiguration.getSubjects());
                    newSubjects.addAll(setupConfiguration.subjects());
                    logger.debugf("Updating stream %s with subjects %s", (Object)streamConfiguration.getName(), newSubjects);
                    return new SetupResult(jsm.updateStream(StreamConfiguration.builder((StreamConfiguration)streamConfiguration).subjects(newSubjects).build()));
                }
                return new SetupResult(streamInfo);
            }
            catch (JetStreamApiException | IOException e) {
                throw new JetStreamSetupException(String.format("Unable to update stream: %s with message: %s", setupConfiguration.stream(), e.getMessage()), e);
            }
        }));
    }

    private Uni<SetupResult> createStream(JetStreamManagement jsm, SetupConfiguration setupConfiguration) {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                StreamConfiguration.Builder streamConfigBuilder = StreamConfiguration.builder().name(setupConfiguration.stream()).storageType(setupConfiguration.storageType()).retentionPolicy(setupConfiguration.retentionPolicy()).replicas(setupConfiguration.replicas().intValue()).subjects(setupConfiguration.subjects());
                return new SetupResult(jsm.addStream(streamConfigBuilder.build()));
            }
            catch (JetStreamApiException | IOException e) {
                throw new JetStreamSetupException(String.format("Unable to create stream: %s with message: %s", setupConfiguration.stream(), e.getMessage()), e);
            }
        }));
    }

    private KeyValueConfiguration createKeyValueConfiguration(KeyValueSetupConfiguration keyValueSetupConfiguration) {
        KeyValueConfiguration.Builder builder = KeyValueConfiguration.builder();
        builder = builder.name(keyValueSetupConfiguration.bucketName());
        builder = keyValueSetupConfiguration.description().map(arg_0 -> ((KeyValueConfiguration.Builder)builder).description(arg_0)).orElse(builder);
        builder = builder.storageType(keyValueSetupConfiguration.storageType());
        builder = keyValueSetupConfiguration.maxBucketSize().map(arg_0 -> ((KeyValueConfiguration.Builder)builder).maxBucketSize(arg_0)).orElse(builder);
        builder = keyValueSetupConfiguration.maxHistoryPerKey().map(arg_0 -> ((KeyValueConfiguration.Builder)builder).maxHistoryPerKey(arg_0)).orElse(builder);
        builder = keyValueSetupConfiguration.maxValueSize().map(arg_0 -> ((KeyValueConfiguration.Builder)builder).maximumValueSize(arg_0)).orElse(builder);
        builder = keyValueSetupConfiguration.ttl().map(arg_0 -> ((KeyValueConfiguration.Builder)builder).ttl(arg_0)).orElse(builder);
        builder = keyValueSetupConfiguration.replicas().map(arg_0 -> ((KeyValueConfiguration.Builder)builder).replicas(arg_0)).orElse(builder);
        builder = builder.compression(keyValueSetupConfiguration.compressed().booleanValue());
        return builder.build();
    }
}

