package io.quarkiverse.reactive.messaging.nats.jetstream.setup;

import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamManagement;
import io.nats.client.KeyValueManagement;
import io.nats.client.api.KeyValueConfiguration;
import io.nats.client.api.StreamConfiguration;
import io.nats.client.api.StreamInfo;
import io.nats.client.api.StreamInfoOptions;
import io.quarkiverse.reactive.messaging.nats.jetstream.JetStreamBuildConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.Connection;
import java.io.IOException;
import java.time.Duration;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import org.jboss.logging.Logger;

/* loaded from: input_file:io/quarkiverse/reactive/messaging/nats/jetstream/setup/JetStreamSetup.class */
public class JetStreamSetup {
    private static final Logger logger = Logger.getLogger(JetStreamSetup.class);

    public void setup(Connection connection, JetStreamBuildConfiguration jetStreamBuildConfiguration) {
        try {
            if (jetStreamBuildConfiguration.autoConfigure().booleanValue()) {
                JetStreamSetupConfiguration.of(jetStreamBuildConfiguration).forEach(jetStreamSetupConfiguration -> {
                    addOrUpdateStream(connection, jetStreamSetupConfiguration);
                });
                KeyValueSetupConfiguration.of(jetStreamBuildConfiguration).forEach(keyValueSetupConfiguration -> {
                    addOrUpdateKeyValueStore(connection, keyValueSetupConfiguration);
                });
            }
        } catch (Exception e) {
            throw new JetStreamSetupException(String.format("Unable to configure stream: %s", e.getMessage()), e);
        }
    }

    public Optional<SetupResult> addOrUpdateStream(Connection connection, JetStreamSetupConfiguration jetStreamSetupConfiguration) {
        try {
            return addOrUpdateStream(connection, connection.jetStreamManagement(), jetStreamSetupConfiguration);
        } catch (IOException e) {
            throw new JetStreamSetupException(String.format("Unable to manage JetStream: %s", e.getMessage()), e);
        }
    }

    private Optional<SetupResult> addOrUpdateStream(Connection connection, JetStreamManagement jetStreamManagement, JetStreamSetupConfiguration jetStreamSetupConfiguration) {
        return (Optional) getStreamInfo(jetStreamManagement, jetStreamSetupConfiguration.stream()).map(streamInfo -> {
            return updateStream(connection, jetStreamManagement, streamInfo, jetStreamSetupConfiguration);
        }).orElseGet(() -> {
            return createStream(connection, jetStreamManagement, jetStreamSetupConfiguration);
        });
    }

    private Optional<SetupResult> updateStream(Connection connection, JetStreamManagement jetStreamManagement, StreamInfo streamInfo, JetStreamSetupConfiguration jetStreamSetupConfiguration) {
        try {
            if (new HashSet(streamInfo.getConfiguration().getSubjects()).containsAll(jetStreamSetupConfiguration.subjects())) {
                return Optional.of(new SetupResult(connection, streamInfo));
            }
            StreamConfiguration configuration = streamInfo.getConfiguration();
            HashSet hashSet = new HashSet(configuration.getSubjects());
            hashSet.addAll(jetStreamSetupConfiguration.subjects());
            logger.debugf("Updating stream %s with subjects %s", configuration.getName(), hashSet);
            return Optional.of(new SetupResult(connection, jetStreamManagement.updateStream(StreamConfiguration.builder(configuration).subjects(hashSet).build())));
        } catch (IOException | JetStreamApiException e) {
            logger.warnf(e, "Unable to update stream: %s with message: %s", jetStreamSetupConfiguration.stream(), e.getMessage());
            return Optional.empty();
        }
    }

    private Optional<SetupResult> createStream(Connection connection, JetStreamManagement jetStreamManagement, JetStreamSetupConfiguration jetStreamSetupConfiguration) {
        try {
            return Optional.of(new SetupResult(connection, jetStreamManagement.addStream(StreamConfiguration.builder().name(jetStreamSetupConfiguration.stream()).storageType(jetStreamSetupConfiguration.storageType()).retentionPolicy(jetStreamSetupConfiguration.retentionPolicy()).replicas(jetStreamSetupConfiguration.replicas().intValue()).subjects(jetStreamSetupConfiguration.subjects()).build())));
        } catch (IOException | JetStreamApiException e) {
            logger.warnf(e, "Unable to create stream: %s with message: %s", jetStreamSetupConfiguration.stream(), e.getMessage());
            return Optional.empty();
        }
    }

    private Optional<StreamInfo> getStreamInfo(JetStreamManagement jetStreamManagement, String str) {
        try {
            return Optional.of(jetStreamManagement.getStreamInfo(str, StreamInfoOptions.allSubjects()));
        } catch (IOException | JetStreamApiException e) {
            return Optional.empty();
        }
    }

    private void addOrUpdateKeyValueStore(Connection connection, KeyValueSetupConfiguration keyValueSetupConfiguration) {
        try {
            KeyValueManagement keyValueManagement = connection.keyValueManagement();
            if (keyValueManagement.getBucketNames().contains(keyValueSetupConfiguration.bucketName())) {
                keyValueManagement.update(createKeyValueConfiguration(keyValueSetupConfiguration));
            } else {
                keyValueManagement.create(createKeyValueConfiguration(keyValueSetupConfiguration));
            }
        } catch (IOException | JetStreamApiException e) {
            throw new JetStreamSetupException(String.format("Unable to manage Key Value Store: %s", e.getMessage()), e);
        }
    }

    private KeyValueConfiguration createKeyValueConfiguration(KeyValueSetupConfiguration keyValueSetupConfiguration) {
        KeyValueConfiguration.Builder name = KeyValueConfiguration.builder().name(keyValueSetupConfiguration.bucketName());
        Optional<String> description = keyValueSetupConfiguration.description();
        Objects.requireNonNull(name);
        KeyValueConfiguration.Builder storageType = ((KeyValueConfiguration.Builder) description.map(name::description).orElse(name)).storageType(keyValueSetupConfiguration.storageType());
        Optional<Long> maxBucketSize = keyValueSetupConfiguration.maxBucketSize();
        Objects.requireNonNull(storageType);
        KeyValueConfiguration.Builder builder = (KeyValueConfiguration.Builder) maxBucketSize.map((v1) -> {
            return r1.maxBucketSize(v1);
        }).orElse(storageType);
        Optional<Integer> maxHistoryPerKey = keyValueSetupConfiguration.maxHistoryPerKey();
        Objects.requireNonNull(builder);
        KeyValueConfiguration.Builder builder2 = (KeyValueConfiguration.Builder) maxHistoryPerKey.map((v1) -> {
            return r1.maxHistoryPerKey(v1);
        }).orElse(builder);
        Optional<Long> maxValueSize = keyValueSetupConfiguration.maxValueSize();
        Objects.requireNonNull(builder2);
        KeyValueConfiguration.Builder builder3 = (KeyValueConfiguration.Builder) maxValueSize.map((v1) -> {
            return r1.maxValueSize(v1);
        }).orElse(builder2);
        Optional<Duration> ttl = keyValueSetupConfiguration.ttl();
        Objects.requireNonNull(builder3);
        KeyValueConfiguration.Builder builder4 = (KeyValueConfiguration.Builder) ttl.map(builder3::ttl).orElse(builder3);
        Optional<Integer> replicas = keyValueSetupConfiguration.replicas();
        Objects.requireNonNull(builder4);
        return ((KeyValueConfiguration.Builder) replicas.map((v1) -> {
            return r1.replicas(v1);
        }).orElse(builder4)).compression(keyValueSetupConfiguration.compressed().booleanValue()).build();
    }
}
