/*
 * Decompiled with CFR 0.152.
 */
package io.quarkiverse.reactive.messaging.nats.jetstream.client;

import io.nats.client.Connection;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamManagement;
import io.nats.client.api.ConsumerPauseResponse;
import io.nats.client.api.PurgeResponse;
import io.nats.client.api.StreamInfo;
import io.nats.client.api.StreamInfoOptions;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.DeleteException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.SetupException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.StreamManagement;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.SystemException;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.Consumer;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.PurgeResult;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.StreamResult;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.StreamSetupConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.StreamState;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.api.StreamStatus;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.StreamConfiguration;
import io.quarkiverse.reactive.messaging.nats.jetstream.client.configuration.StreamConfigurationFactory;
import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.ConsumerMapper;
import io.quarkiverse.reactive.messaging.nats.jetstream.mapper.StreamStateMapper;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.unchecked.Unchecked;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.Vertx;
import java.io.IOException;
import java.time.ZonedDateTime;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import lombok.Generated;
import org.jboss.logging.Logger;

class DefaultStreamManagement
implements StreamManagement {
    @Generated
    private static final Logger log = Logger.getLogger(DefaultStreamManagement.class);
    private final Connection connection;
    private final StreamStateMapper streamStateMapper;
    private final ConsumerMapper consumerMapper;
    private final Vertx vertx;

    @Override
    public Uni<Consumer> getConsumer(String stream, String consumerName) {
        return this.context().executeBlocking(this.getJetStreamManagement().onItem().transformToUni(jsm -> Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                return jsm.getConsumerInfo(stream, consumerName);
            }
            catch (JetStreamApiException | IOException e) {
                throw new SystemException(e);
            }
        }))).onItem().transform(this.consumerMapper::of));
    }

    @Override
    public Uni<Void> deleteConsumer(String streamName, String consumerName) {
        return this.context().executeBlocking(this.getJetStreamManagement().onItem().transformToUni(jsm -> Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                jsm.deleteConsumer(streamName, consumerName);
                return null;
            }
            catch (JetStreamApiException | IOException failure) {
                throw new SystemException(failure);
            }
        }))));
    }

    @Override
    public Uni<Void> pauseConsumer(String streamName, String consumerName, ZonedDateTime pauseUntil) {
        return this.context().executeBlocking(this.getJetStreamManagement().onItem().transformToUni(jetStreamManagement -> Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                ConsumerPauseResponse response = jetStreamManagement.pauseConsumer(streamName, consumerName, pauseUntil);
                if (!response.isPaused()) {
                    throw new SystemException(String.format("Unable to pause consumer %s in stream %s", consumerName, streamName));
                }
                return null;
            }
            catch (JetStreamApiException | IOException failure) {
                throw new SystemException(failure);
            }
        }))));
    }

    @Override
    public Uni<Void> resumeConsumer(String streamName, String consumerName) {
        return this.context().executeBlocking(this.getJetStreamManagement().onItem().transformToUni(jetStreamManagement -> Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                boolean response = jetStreamManagement.resumeConsumer(streamName, consumerName);
                if (!response) {
                    throw new SystemException(String.format("Unable to resume consumer %s in stream %s", consumerName, streamName));
                }
                return null;
            }
            catch (JetStreamApiException | IOException failure) {
                throw new SystemException(failure);
            }
        }))));
    }

    @Override
    public Uni<Long> getFirstSequence(String streamName) {
        return this.context().executeBlocking(this.getStreamInfo(streamName).onItem().transform(tuple -> tuple.streamInfo().getStreamState().getFirstSequence()));
    }

    @Override
    public Uni<List<String>> getStreams() {
        return this.context().executeBlocking(this.getJetStreamManagement().onItem().transformToUni(jsm -> Uni.createFrom().item(Unchecked.supplier(() -> ((JetStreamManagement)jsm).getStreamNames()))));
    }

    @Override
    public Uni<List<String>> getSubjects(String streamName) {
        return this.context().executeBlocking(this.getStreamInfo(streamName).onItem().transform(tuple -> tuple.streamInfo().getConfiguration().getSubjects()));
    }

    @Override
    public Uni<List<String>> getConsumerNames(String streamName) {
        return this.context().executeBlocking(this.getJetStreamManagement().onItem().transformToUni(jsm -> Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                return jsm.getConsumerNames(streamName);
            }
            catch (JetStreamApiException | IOException failure) {
                throw new SystemException(failure);
            }
        }))));
    }

    @Override
    public Uni<PurgeResult> purgeStream(String streamName) {
        return this.context().executeBlocking(this.getJetStreamManagement().onItem().transformToUni(jsm -> Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                PurgeResponse response = jsm.purgeStream(streamName);
                return new PurgeResult(streamName, response.isSuccess(), response.getPurged());
            }
            catch (JetStreamApiException | IOException failure) {
                throw new SystemException(failure);
            }
        }))));
    }

    @Override
    public Uni<Void> deleteMessage(String stream, long sequence, boolean erase) {
        return this.context().executeBlocking(this.getJetStreamManagement().onItem().transformToUni(jsm -> Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                if (!jsm.deleteMessage(stream, sequence, erase)) {
                    throw new DeleteException(String.format("Unable to delete message in stream %s with sequence %d", stream, sequence));
                }
                return null;
            }
            catch (JetStreamApiException | IOException failure) {
                throw new DeleteException(String.format("Unable to delete message in stream %s with sequence %d: %s", stream, sequence, failure.getMessage()), failure);
            }
        }))));
    }

    @Override
    public Uni<StreamState> getStreamState(String streamName) {
        return this.context().executeBlocking(this.getStreamInfo(streamName).onItem().transform(tuple -> this.streamStateMapper.of(tuple.streamInfo().getStreamState())));
    }

    @Override
    public Uni<StreamConfiguration> getStreamConfiguration(String streamName) {
        return this.context().executeBlocking(this.getStreamInfo(streamName).onItem().transform(tuple -> StreamConfiguration.of(tuple.streamInfo().getConfiguration())));
    }

    @Override
    public Uni<List<PurgeResult>> purgeAllStreams() {
        return this.context().executeBlocking(this.getStreams().onItem().transformToUni(this::purgeAllStreams));
    }

    @Override
    public Uni<List<StreamResult>> addStreams(List<StreamSetupConfiguration> streamConfigurations) {
        return this.context().executeBlocking(this.getJetStreamManagement().onItem().transformToMulti(jetStreamManagement -> Multi.createFrom().items(streamConfigurations.stream().map(streamConfiguration -> StreamSetupConfigurationTuple.builder().jetStreamManagement((JetStreamManagement)jetStreamManagement).configuration((StreamSetupConfiguration)streamConfiguration).build()))).onItem().transformToUniAndMerge(tuple -> this.addOrUpdateStream(tuple.jetStreamManagement(), tuple.configuration())).collect().asList());
    }

    @Override
    public Uni<Void> addSubject(String streamName, String subject) {
        return this.context().executeBlocking(this.getStreamInfo(streamName).onItem().transformToUni(tuple -> Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                HashSet<String> subjects = new HashSet<String>(tuple.streamInfo().getConfiguration().getSubjects());
                if (!subjects.contains(subject)) {
                    subjects.add(subject);
                    io.nats.client.api.StreamConfiguration configuration = io.nats.client.api.StreamConfiguration.builder((io.nats.client.api.StreamConfiguration)tuple.streamInfo().getConfiguration()).subjects(subjects).build();
                    tuple.jetStreamManagement().updateStream(configuration);
                }
                return null;
            }
            catch (JetStreamApiException | IOException failure) {
                throw new SystemException(failure);
            }
        }))));
    }

    @Override
    public Uni<Void> removeSubject(String streamName, String subject) {
        return this.context().executeBlocking(this.getStreamInfo(streamName).onItem().transformToUni(tuple -> Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                HashSet subjects = new HashSet(tuple.streamInfo().getConfiguration().getSubjects());
                if (subjects.contains(subject)) {
                    subjects.remove(subject);
                    io.nats.client.api.StreamConfiguration configuration = io.nats.client.api.StreamConfiguration.builder((io.nats.client.api.StreamConfiguration)tuple.streamInfo().getConfiguration()).subjects(subjects).build();
                    tuple.jetStreamManagement().updateStream(configuration);
                }
                return null;
            }
            catch (JetStreamApiException | IOException failure) {
                throw new SystemException(failure);
            }
        }))));
    }

    private Uni<StreamInfoTuple> getStreamInfo(String streamName) {
        return this.getJetStreamManagement().onItem().transformToUni(jsm -> this.getStreamInfo((JetStreamManagement)jsm, streamName));
    }

    private Uni<StreamInfoTuple> getStreamInfo(JetStreamManagement jsm, String streamName) {
        return Uni.createFrom().emitter(emitter -> {
            try {
                emitter.complete((Object)StreamInfoTuple.builder().jetStreamManagement(jsm).streamInfo(jsm.getStreamInfo(streamName, StreamInfoOptions.allSubjects())).build());
            }
            catch (Throwable failure) {
                emitter.fail((Throwable)new SystemException(String.format("Unable to read stream %s with message: %s", streamName, failure.getMessage()), failure));
            }
        });
    }

    private Uni<StreamResult> addOrUpdateStream(JetStreamManagement jsm, StreamSetupConfiguration setupConfiguration) {
        return this.getStreamInfo(jsm, setupConfiguration.configuration().name()).onItem().transformToUni(tuple -> this.updateStream(tuple.jetStreamManagement(), tuple.streamInfo(), setupConfiguration)).onFailure().recoverWithUni(failure -> this.createStream(jsm, setupConfiguration.configuration()));
    }

    private Uni<StreamResult> createStream(JetStreamManagement jsm, StreamConfiguration streamConfiguration) {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                StreamConfigurationFactory factory = new StreamConfigurationFactory();
                io.nats.client.api.StreamConfiguration streamConfig = factory.create(streamConfiguration);
                jsm.addStream(streamConfig);
                return StreamResult.builder().configuration(streamConfiguration).status(StreamStatus.Created).build();
            }
            catch (Exception failure) {
                throw new SetupException(String.format("Unable to create stream: %s with message: %s", streamConfiguration.name(), failure.getMessage()), failure);
            }
        }));
    }

    private Uni<StreamResult> updateStream(JetStreamManagement jsm, StreamInfo streamInfo, StreamSetupConfiguration setupConfiguration) {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                io.nats.client.api.StreamConfiguration currentConfiguration = streamInfo.getConfiguration();
                StreamConfigurationFactory factory = new StreamConfigurationFactory();
                Optional<io.nats.client.api.StreamConfiguration> configuration = factory.create(currentConfiguration, setupConfiguration.configuration());
                if (configuration.isPresent()) {
                    log.debugf("Updating stream %s", (Object)setupConfiguration.configuration().name());
                    jsm.updateStream(configuration.get());
                    return StreamResult.builder().configuration(setupConfiguration.configuration()).status(StreamStatus.Updated).build();
                }
                return StreamResult.builder().configuration(setupConfiguration.configuration()).status(StreamStatus.NotModified).build();
            }
            catch (Exception failure) {
                log.errorf((Throwable)failure, "message: %s", (Object)failure.getMessage());
                throw new SetupException(String.format("Unable to update stream: %s with message: %s", setupConfiguration.configuration().name(), failure.getMessage()), failure);
            }
        })).onFailure().recoverWithUni(failure -> {
            if (failure.getCause() instanceof JetStreamApiException && setupConfiguration.overwrite()) {
                return this.deleteStream(jsm, setupConfiguration.configuration().name()).onItem().transformToUni(v -> this.createStream(jsm, setupConfiguration.configuration()));
            }
            return Uni.createFrom().failure(failure);
        });
    }

    private Uni<Void> deleteStream(JetStreamManagement jsm, String streamName) {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            try {
                jsm.deleteStream(streamName);
                return null;
            }
            catch (JetStreamApiException | IOException failure) {
                throw new SetupException(String.format("Unable to delete stream: %s with message: %s", streamName, failure.getMessage()), failure);
            }
        }));
    }

    private Optional<PurgeResult> purgeStream(JetStreamManagement jetStreamManagement, String streamName) {
        try {
            PurgeResponse response = jetStreamManagement.purgeStream(streamName);
            return Optional.of(PurgeResult.builder().streamName(streamName).success(response.isSuccess()).purgeCount(response.getPurged()).build());
        }
        catch (JetStreamApiException | IOException e) {
            log.warnf(e, "Unable to purge stream %s with message: %s", (Object)streamName, (Object)e.getMessage());
            return Optional.empty();
        }
    }

    private Uni<List<PurgeResult>> purgeAllStreams(List<String> streams) {
        return this.getJetStreamManagement().onItem().transformToUni(jsm -> Uni.createFrom().item(Unchecked.supplier(() -> streams.stream().flatMap(streamName -> this.purgeStream((JetStreamManagement)jsm, (String)streamName).stream()).toList())));
    }

    private Uni<JetStreamManagement> getJetStreamManagement() {
        return Uni.createFrom().item(Unchecked.supplier(() -> ((Connection)this.connection).jetStreamManagement()));
    }

    private Context context() {
        return this.vertx.getOrCreateContext();
    }

    @Generated
    public DefaultStreamManagement(Connection connection, StreamStateMapper streamStateMapper, ConsumerMapper consumerMapper, Vertx vertx) {
        this.connection = connection;
        this.streamStateMapper = streamStateMapper;
        this.consumerMapper = consumerMapper;
        this.vertx = vertx;
    }

    private record StreamInfoTuple(StreamInfo streamInfo, JetStreamManagement jetStreamManagement) {
        @Generated
        public static StreamInfoTupleBuilder builder() {
            return new StreamInfoTupleBuilder();
        }

        @Generated
        public static class StreamInfoTupleBuilder {
            @Generated
            private StreamInfo streamInfo;
            @Generated
            private JetStreamManagement jetStreamManagement;

            @Generated
            StreamInfoTupleBuilder() {
            }

            @Generated
            public StreamInfoTupleBuilder streamInfo(StreamInfo streamInfo) {
                this.streamInfo = streamInfo;
                return this;
            }

            @Generated
            public StreamInfoTupleBuilder jetStreamManagement(JetStreamManagement jetStreamManagement) {
                this.jetStreamManagement = jetStreamManagement;
                return this;
            }

            @Generated
            public StreamInfoTuple build() {
                return new StreamInfoTuple(this.streamInfo, this.jetStreamManagement);
            }

            @Generated
            public String toString() {
                return "DefaultStreamManagement.StreamInfoTuple.StreamInfoTupleBuilder(streamInfo=" + this.streamInfo + ", jetStreamManagement=" + this.jetStreamManagement + ")";
            }
        }
    }

    private record StreamSetupConfigurationTuple(StreamSetupConfiguration configuration, JetStreamManagement jetStreamManagement) {
        @Generated
        public static StreamSetupConfigurationTupleBuilder builder() {
            return new StreamSetupConfigurationTupleBuilder();
        }

        @Generated
        public static class StreamSetupConfigurationTupleBuilder {
            @Generated
            private StreamSetupConfiguration configuration;
            @Generated
            private JetStreamManagement jetStreamManagement;

            @Generated
            StreamSetupConfigurationTupleBuilder() {
            }

            @Generated
            public StreamSetupConfigurationTupleBuilder configuration(StreamSetupConfiguration configuration) {
                this.configuration = configuration;
                return this;
            }

            @Generated
            public StreamSetupConfigurationTupleBuilder jetStreamManagement(JetStreamManagement jetStreamManagement) {
                this.jetStreamManagement = jetStreamManagement;
                return this;
            }

            @Generated
            public StreamSetupConfigurationTuple build() {
                return new StreamSetupConfigurationTuple(this.configuration, this.jetStreamManagement);
            }

            @Generated
            public String toString() {
                return "DefaultStreamManagement.StreamSetupConfigurationTuple.StreamSetupConfigurationTupleBuilder(configuration=" + this.configuration + ", jetStreamManagement=" + this.jetStreamManagement + ")";
            }
        }
    }
}

