package io.quarkiverse.reactive.messaging.nats.jetstream.client.administration;

import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamManagement;
import io.nats.client.KeyValueManagement;
import io.nats.client.api.ConsumerInfo;
import io.nats.client.api.KeyValueConfiguration;
import io.nats.client.api.PurgeResponse;
import io.nats.client.api.StreamConfiguration;
import io.nats.client.api.StreamInfo;
import io.nats.client.api.StreamInfoOptions;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.AbstractConnection;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.AdministrationException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.ConnectionListener;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.ConnectionConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.KeyValueSetupConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.SetupConfiguration;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.unchecked.Unchecked;
import java.io.IOException;
import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.jboss.logging.Logger;

/* loaded from: input_file:io/quarkiverse/reactive/messaging/nats/jetstream/client/administration/AdministrationConnection.class */
public class AdministrationConnection extends AbstractConnection implements io.quarkiverse.reactive.messaging.nats.jetstream.client.AdministrationConnection {
    private static final Logger logger = Logger.getLogger(AdministrationConnection.class);

    public AdministrationConnection(ConnectionConfiguration connectionConfiguration, ConnectionListener connectionListener) {
        super(connectionConfiguration, connectionListener);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.AdministrationConnection
    public Uni<ConsumerInfo> getConsumerInfo(String str, String str2) {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                return this.connection.jetStreamManagement().getConsumerInfo(str, str2);
            } catch (IOException | JetStreamApiException e) {
                throw new AdministrationException(e);
            }
        }));
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.AdministrationConnection
    public Uni<List<String>> getStreams() {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                return this.connection.jetStreamManagement().getStreamNames();
            } catch (IOException | JetStreamApiException e) {
                throw new AdministrationException(e);
            }
        }));
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.AdministrationConnection
    public Uni<List<String>> getSubjects(String str) {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            return (List) getStreamInfo(str).map(streamInfo -> {
                return streamInfo.getConfiguration().getSubjects();
            }).orElseGet(List::of);
        }));
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.AdministrationConnection
    public Uni<List<String>> getConsumerNames(String str) {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                return this.connection.jetStreamManagement().getConsumerNames(str);
            } catch (IOException | JetStreamApiException e) {
                throw new AdministrationException(e);
            }
        }));
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.AdministrationConnection
    public Uni<PurgeResult> purgeStream(String str) {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                PurgeResponse purgeStream = this.connection.jetStreamManagement().purgeStream(str);
                return new PurgeResult(str, purgeStream.isSuccess(), purgeStream.getPurged());
            } catch (IOException | JetStreamApiException e) {
                throw new AdministrationException(e);
            }
        }));
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.AdministrationConnection
    public Uni<Void> deleteMessage(String str, long j, boolean z) {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                if (this.connection.jetStreamManagement().deleteMessage(str, j, z)) {
                    return null;
                }
                throw new DeleteException(String.format("Unable to delete message in stream %s with sequence %d", str, Long.valueOf(j)));
            } catch (IOException | JetStreamApiException e) {
                throw new DeleteException(String.format("Unable to delete message in stream %s with sequence %d: %s", str, Long.valueOf(j), e.getMessage()), e);
            }
        }));
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.AdministrationConnection
    public Uni<StreamState> getStreamState(String str) {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            return (StreamState) getStreamInfo(str).map(streamInfo -> {
                return StreamState.of(streamInfo.getStreamState());
            }).orElseThrow(() -> {
                return new AdministrationException("Stream state not found");
            });
        }));
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.AdministrationConnection
    public Uni<List<PurgeResult>> purgeAllStreams() {
        return getStreams().onItem().transformToUni(this::purgeAllStreams);
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.AdministrationConnection
    public Uni<SetupResult> addOrUpdateStream(SetupConfiguration setupConfiguration) {
        return getJetStreamManagement().onItem().transformToUni(jetStreamManagement -> {
            return addOrUpdateStream(jetStreamManagement, setupConfiguration);
        });
    }

    @Override // io.quarkiverse.reactive.messaging.nats.jetstream.client.AdministrationConnection
    public Uni<Void> addOrUpdateKeyValueStore(KeyValueSetupConfiguration keyValueSetupConfiguration) {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                KeyValueManagement keyValueManagement = this.connection.keyValueManagement();
                if (keyValueManagement.getBucketNames().contains(keyValueSetupConfiguration.bucketName())) {
                    keyValueManagement.update(createKeyValueConfiguration(keyValueSetupConfiguration));
                    return null;
                }
                keyValueManagement.create(createKeyValueConfiguration(keyValueSetupConfiguration));
                return null;
            } catch (IOException | JetStreamApiException e) {
                throw new JetStreamSetupException(String.format("Unable to manage Key Value Store: %s", e.getMessage()), e);
            }
        }));
    }

    private Uni<JetStreamManagement> getJetStreamManagement() {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                return this.connection.jetStreamManagement();
            } catch (IOException e) {
                throw new AdministrationException(String.format("Unable to manage JetStream: %s", e.getMessage()), e);
            }
        }));
    }

    private Uni<SetupResult> addOrUpdateStream(JetStreamManagement jetStreamManagement, SetupConfiguration setupConfiguration) {
        return getStreamInfo(jetStreamManagement, setupConfiguration.stream()).onItem().transformToUni(streamInfo -> {
            return updateStream(jetStreamManagement, streamInfo, setupConfiguration);
        }).onFailure().recoverWithUni(th -> {
            return createStream(jetStreamManagement, setupConfiguration);
        });
    }

    private Optional<StreamInfo> getStreamInfo(String str) {
        try {
            return Optional.of(this.connection.jetStreamManagement().getStreamInfo(str, StreamInfoOptions.allSubjects()));
        } catch (IOException | JetStreamApiException e) {
            logger.debugf(e, "Unable to read stream %s with message: %s", str, e.getMessage());
            return Optional.empty();
        }
    }

    private Uni<StreamInfo> getStreamInfo(JetStreamManagement jetStreamManagement, String str) {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                return jetStreamManagement.getStreamInfo(str, StreamInfoOptions.allSubjects());
            } catch (IOException | JetStreamApiException e) {
                throw new AdministrationException(String.format("Unable to read stream %s with message: %s", str, e.getMessage()), e);
            }
        }));
    }

    private Optional<PurgeResult> purgeStream(JetStreamManagement jetStreamManagement, String str) {
        try {
            PurgeResponse purgeStream = jetStreamManagement.purgeStream(str);
            return Optional.of(new PurgeResult(str, purgeStream.isSuccess(), purgeStream.getPurged()));
        } catch (IOException | JetStreamApiException e) {
            logger.warnf(e, "Unable to purge stream %s with message: %s", str, e.getMessage());
            return Optional.empty();
        }
    }

    private Uni<List<PurgeResult>> purgeAllStreams(List<String> list) {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                JetStreamManagement jetStreamManagement = this.connection.jetStreamManagement();
                return list.stream().flatMap(str -> {
                    return purgeStream(jetStreamManagement, str).stream();
                }).toList();
            } catch (IOException e) {
                throw new AdministrationException(e);
            }
        }));
    }

    private Uni<SetupResult> updateStream(JetStreamManagement jetStreamManagement, StreamInfo streamInfo, SetupConfiguration setupConfiguration) {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                if (new HashSet(streamInfo.getConfiguration().getSubjects()).containsAll(setupConfiguration.subjects())) {
                    return new SetupResult(streamInfo);
                }
                StreamConfiguration configuration = streamInfo.getConfiguration();
                HashSet hashSet = new HashSet(configuration.getSubjects());
                hashSet.addAll(setupConfiguration.subjects());
                logger.debugf("Updating stream %s with subjects %s", configuration.getName(), hashSet);
                return new SetupResult(jetStreamManagement.updateStream(StreamConfiguration.builder(configuration).subjects(hashSet).build()));
            } catch (IOException | JetStreamApiException e) {
                throw new JetStreamSetupException(String.format("Unable to update stream: %s with message: %s", setupConfiguration.stream(), e.getMessage()), e);
            }
        }));
    }

    private Uni<SetupResult> createStream(JetStreamManagement jetStreamManagement, SetupConfiguration setupConfiguration) {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                return new SetupResult(jetStreamManagement.addStream(StreamConfiguration.builder().name(setupConfiguration.stream()).storageType(setupConfiguration.storageType()).retentionPolicy(setupConfiguration.retentionPolicy()).replicas(setupConfiguration.replicas().intValue()).subjects(setupConfiguration.subjects()).build()));
            } catch (IOException | JetStreamApiException e) {
                throw new JetStreamSetupException(String.format("Unable to create stream: %s with message: %s", setupConfiguration.stream(), e.getMessage()), e);
            }
        }));
    }

    private KeyValueConfiguration createKeyValueConfiguration(KeyValueSetupConfiguration keyValueSetupConfiguration) {
        KeyValueConfiguration.Builder name = KeyValueConfiguration.builder().name(keyValueSetupConfiguration.bucketName());
        Optional<String> description = keyValueSetupConfiguration.description();
        Objects.requireNonNull(name);
        KeyValueConfiguration.Builder storageType = ((KeyValueConfiguration.Builder) description.map(name::description).orElse(name)).storageType(keyValueSetupConfiguration.storageType());
        Optional<Long> maxBucketSize = keyValueSetupConfiguration.maxBucketSize();
        Objects.requireNonNull(storageType);
        KeyValueConfiguration.Builder builder = (KeyValueConfiguration.Builder) maxBucketSize.map((v1) -> {
            return r1.maxBucketSize(v1);
        }).orElse(storageType);
        Optional<Integer> maxHistoryPerKey = keyValueSetupConfiguration.maxHistoryPerKey();
        Objects.requireNonNull(builder);
        KeyValueConfiguration.Builder builder2 = (KeyValueConfiguration.Builder) maxHistoryPerKey.map((v1) -> {
            return r1.maxHistoryPerKey(v1);
        }).orElse(builder);
        Optional<Integer> maxValueSize = keyValueSetupConfiguration.maxValueSize();
        Objects.requireNonNull(builder2);
        KeyValueConfiguration.Builder builder3 = (KeyValueConfiguration.Builder) maxValueSize.map((v1) -> {
            return r1.maximumValueSize(v1);
        }).orElse(builder2);
        Optional<Duration> ttl = keyValueSetupConfiguration.ttl();
        Objects.requireNonNull(builder3);
        KeyValueConfiguration.Builder builder4 = (KeyValueConfiguration.Builder) ttl.map(builder3::ttl).orElse(builder3);
        Optional<Integer> replicas = keyValueSetupConfiguration.replicas();
        Objects.requireNonNull(builder4);
        return ((KeyValueConfiguration.Builder) replicas.map((v1) -> {
            return r1.replicas(v1);
        }).orElse(builder4)).compression(keyValueSetupConfiguration.compressed().booleanValue()).build();
    }
}
