package io.quarkiverse.reactive.messaging.nats.jetstream;

import io.nats.client.Connection;
import io.nats.client.JetStreamManagement;
import io.nats.client.KeyValueManagement;
import io.nats.client.Nats;
import io.nats.client.api.KeyValueConfiguration;
import io.nats.client.api.StreamConfiguration;
import io.nats.client.api.StreamInfo;
import io.nats.client.api.StreamInfoOptions;
import io.quarkiverse.reactive.messaging.nats.NatsConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.SetupException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.SystemException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionOptionsFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.KeyValueSetupConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.SetupConfiguration;
import io.quarkus.runtime.RuntimeValue;
import io.quarkus.runtime.annotations.Recorder;
import io.smallrye.mutiny.Uni;
import java.time.Duration;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import org.jboss.logging.Logger;

@Recorder
/* loaded from: input_file:io/quarkiverse/reactive/messaging/nats/jetstream/JetStreamRecorder.class */
public class JetStreamRecorder {
    private static final Logger logger = Logger.getLogger(JetStreamRecorder.class);
    private final RuntimeValue<NatsConfiguration> natsConfiguration;
    private final RuntimeValue<JetStreamBuildConfiguration> jetStreamConfiguration;

    public JetStreamRecorder(RuntimeValue<NatsConfiguration> runtimeValue, RuntimeValue<JetStreamBuildConfiguration> runtimeValue2) {
        this.natsConfiguration = runtimeValue;
        this.jetStreamConfiguration = runtimeValue2;
    }

    public void setupStreams() {
        if (((JetStreamBuildConfiguration) this.jetStreamConfiguration.getValue()).autoConfigure().booleanValue()) {
            try {
                Connection connect = connect();
                try {
                    SetupConfiguration.of((JetStreamBuildConfiguration) this.jetStreamConfiguration.getValue()).forEach(setupConfiguration -> {
                        addOrUpdateStream(connect, setupConfiguration).await().indefinitely();
                    });
                    KeyValueSetupConfiguration.of((JetStreamBuildConfiguration) this.jetStreamConfiguration.getValue()).forEach(keyValueSetupConfiguration -> {
                        addOrUpdateKeyValueStore(connect, keyValueSetupConfiguration).await().indefinitely();
                    });
                    if (connect != null) {
                        connect.close();
                    }
                } finally {
                }
            } catch (Throwable th) {
                throw new SetupException(String.format("Unable to configure stream: %s", th.getMessage()), th);
            }
        }
    }

    private Connection connect() throws ConnectionException {
        try {
            return Nats.connect(new ConnectionOptionsFactory().create(ConnectionConfiguration.of((NatsConfiguration) this.natsConfiguration.getValue())));
        } catch (Throwable th) {
            throw new ConnectionException(th);
        }
    }

    private Uni<StreamInfo> addOrUpdateStream(Connection connection, SetupConfiguration setupConfiguration) {
        return getJetStreamManagement(connection).onItem().transformToUni(jetStreamManagement -> {
            return addOrUpdateStream(jetStreamManagement, setupConfiguration);
        });
    }

    private Uni<Void> addOrUpdateKeyValueStore(Connection connection, KeyValueSetupConfiguration keyValueSetupConfiguration) {
        return Uni.createFrom().emitter(uniEmitter -> {
            try {
                KeyValueManagement keyValueManagement = connection.keyValueManagement();
                if (keyValueManagement.getBucketNames().contains(keyValueSetupConfiguration.bucketName())) {
                    keyValueManagement.update(createKeyValueConfiguration(keyValueSetupConfiguration));
                } else {
                    keyValueManagement.create(createKeyValueConfiguration(keyValueSetupConfiguration));
                }
                uniEmitter.complete((Object) null);
            } catch (Throwable th) {
                uniEmitter.fail(new SetupException(String.format("Unable to manage Key Value Store: %s", th.getMessage()), th));
            }
        });
    }

    private Uni<StreamInfo> addOrUpdateStream(JetStreamManagement jetStreamManagement, SetupConfiguration setupConfiguration) {
        return getStreamInfo(jetStreamManagement, setupConfiguration.stream()).onItem().transformToUni(streamInfo -> {
            return updateStream(jetStreamManagement, streamInfo, setupConfiguration);
        }).onFailure().recoverWithUni(th -> {
            return createStream(jetStreamManagement, setupConfiguration);
        });
    }

    private KeyValueConfiguration createKeyValueConfiguration(KeyValueSetupConfiguration keyValueSetupConfiguration) {
        KeyValueConfiguration.Builder name = KeyValueConfiguration.builder().name(keyValueSetupConfiguration.bucketName());
        Optional<String> description = keyValueSetupConfiguration.description();
        Objects.requireNonNull(name);
        KeyValueConfiguration.Builder storageType = ((KeyValueConfiguration.Builder) description.map(name::description).orElse(name)).storageType(keyValueSetupConfiguration.storageType());
        Optional<Long> maxBucketSize = keyValueSetupConfiguration.maxBucketSize();
        Objects.requireNonNull(storageType);
        KeyValueConfiguration.Builder builder = (KeyValueConfiguration.Builder) maxBucketSize.map((v1) -> {
            return r1.maxBucketSize(v1);
        }).orElse(storageType);
        Optional<Integer> maxHistoryPerKey = keyValueSetupConfiguration.maxHistoryPerKey();
        Objects.requireNonNull(builder);
        KeyValueConfiguration.Builder builder2 = (KeyValueConfiguration.Builder) maxHistoryPerKey.map((v1) -> {
            return r1.maxHistoryPerKey(v1);
        }).orElse(builder);
        Optional<Integer> maxValueSize = keyValueSetupConfiguration.maxValueSize();
        Objects.requireNonNull(builder2);
        KeyValueConfiguration.Builder builder3 = (KeyValueConfiguration.Builder) maxValueSize.map((v1) -> {
            return r1.maximumValueSize(v1);
        }).orElse(builder2);
        Optional<Duration> ttl = keyValueSetupConfiguration.ttl();
        Objects.requireNonNull(builder3);
        KeyValueConfiguration.Builder builder4 = (KeyValueConfiguration.Builder) ttl.map(builder3::ttl).orElse(builder3);
        Optional<Integer> replicas = keyValueSetupConfiguration.replicas();
        Objects.requireNonNull(builder4);
        return ((KeyValueConfiguration.Builder) replicas.map((v1) -> {
            return r1.replicas(v1);
        }).orElse(builder4)).compression(keyValueSetupConfiguration.compressed().booleanValue()).build();
    }

    private Uni<StreamInfo> createStream(JetStreamManagement jetStreamManagement, SetupConfiguration setupConfiguration) {
        return Uni.createFrom().emitter(uniEmitter -> {
            try {
                uniEmitter.complete(jetStreamManagement.addStream(StreamConfiguration.builder().name(setupConfiguration.stream()).storageType(setupConfiguration.storageType()).retentionPolicy(setupConfiguration.retentionPolicy()).replicas(setupConfiguration.replicas().intValue()).subjects(setupConfiguration.subjects()).build()));
            } catch (Throwable th) {
                uniEmitter.fail(new SetupException(String.format("Unable to create stream: %s with message: %s", setupConfiguration.stream(), th.getMessage()), th));
            }
        });
    }

    private Uni<StreamInfo> updateStream(JetStreamManagement jetStreamManagement, StreamInfo streamInfo, SetupConfiguration setupConfiguration) {
        return Uni.createFrom().emitter(uniEmitter -> {
            try {
                if (new HashSet(streamInfo.getConfiguration().getSubjects()).containsAll(setupConfiguration.subjects())) {
                    uniEmitter.complete(streamInfo);
                } else {
                    StreamConfiguration configuration = streamInfo.getConfiguration();
                    HashSet hashSet = new HashSet(configuration.getSubjects());
                    hashSet.addAll(setupConfiguration.subjects());
                    logger.debugf("Updating stream %s with subjects %s", configuration.getName(), hashSet);
                    uniEmitter.complete(jetStreamManagement.updateStream(StreamConfiguration.builder(configuration).subjects(hashSet).build()));
                }
            } catch (Throwable th) {
                uniEmitter.fail(new SetupException(String.format("Unable to update stream: %s with message: %s", setupConfiguration.stream(), th.getMessage()), th));
            }
        });
    }

    private Uni<JetStreamManagement> getJetStreamManagement(Connection connection) {
        return Uni.createFrom().emitter(uniEmitter -> {
            try {
                uniEmitter.complete(connection.jetStreamManagement());
            } catch (Throwable th) {
                uniEmitter.fail(new SystemException(String.format("Unable to manage JetStream: %s", th.getMessage()), th));
            }
        });
    }

    private Uni<StreamInfo> getStreamInfo(JetStreamManagement jetStreamManagement, String str) {
        return Uni.createFrom().emitter(uniEmitter -> {
            try {
                uniEmitter.complete(jetStreamManagement.getStreamInfo(str, StreamInfoOptions.allSubjects()));
            } catch (Throwable th) {
                uniEmitter.fail(new SystemException(String.format("Unable to read stream %s with message: %s", str, th.getMessage()), th));
            }
        });
    }
}
