package io.activej.cube.aggregation;

import io.activej.async.function.AsyncSupplier;
import io.activej.async.service.ReactiveService;
import io.activej.bytebuf.ByteBuf;
import io.activej.codegen.DefiningClassLoader;
import io.activej.common.ApplicationSettings;
import io.activej.common.Checks;
import io.activej.common.MemSize;
import io.activej.common.builder.AbstractBuilder;
import io.activej.common.exception.MalformedDataException;
import io.activej.csp.consumer.ChannelConsumers;
import io.activej.csp.process.frame.ChannelFrameDecoder;
import io.activej.csp.process.frame.ChannelFrameEncoder;
import io.activej.csp.process.frame.FrameFormat;
import io.activej.csp.process.frame.FrameFormats;
import io.activej.csp.supplier.ChannelSupplier;
import io.activej.csp.supplier.ChannelSuppliers;
import io.activej.cube.AggregationStructure;
import io.activej.cube.aggregation.util.Utils;
import io.activej.datastream.consumer.StreamConsumer;
import io.activej.datastream.consumer.StreamConsumers;
import io.activej.datastream.csp.ChannelDeserializer;
import io.activej.datastream.csp.ChannelSerializer;
import io.activej.datastream.processor.transformer.StreamSupplierTransformer;
import io.activej.datastream.stats.BasicStreamStats;
import io.activej.datastream.stats.DetailedStreamStats;
import io.activej.datastream.stats.StreamStats;
import io.activej.datastream.stats.StreamStatsSizeCounter;
import io.activej.datastream.supplier.StreamSupplier;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.jmx.api.attribute.JmxOperation;
import io.activej.jmx.stats.EventStats;
import io.activej.jmx.stats.ValueStats;
import io.activej.promise.Promise;
import io.activej.promise.jmx.PromiseStats;
import io.activej.reactor.AbstractReactive;
import io.activej.reactor.Reactive;
import io.activej.reactor.Reactor;
import io.activej.reactor.jmx.ReactiveJmxBeanWithStats;
import io.minio.BucketExistsArgs;
import io.minio.GetObjectArgs;
import io.minio.ListObjectsArgs;
import io.minio.MinioAsyncClient;
import io.minio.PutObjectArgs;
import io.minio.RemoveObjectsArgs;
import io.minio.Result;
import io.minio.messages.DeleteError;
import io.minio.messages.DeleteObject;
import io.minio.messages.Item;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.time.Duration;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;

/* loaded from: input_file:io/activej/cube/aggregation/MinioChunkStorage.class */
public final class MinioChunkStorage extends AbstractReactive implements IAggregationChunkStorage, ReactiveService, ReactiveJmxBeanWithStats {
    public static final String LOG = ".log";
    private final AsyncSupplier<Long> idGenerator;
    private final MinioAsyncClient client;
    private final Executor executor;
    private final String bucket;
    private long partSize;
    private FrameFormat frameFormat;
    private MemSize bufferSize;
    private final PromiseStats promiseAsyncSupplier;
    private final PromiseStats promiseList;
    private final PromiseStats promiseOpenR;
    private final PromiseStats promiseOpenW;
    private final PromiseStats promiseDelete;
    private final DetailedStreamStats<ByteBuf> readFile;
    private final DetailedStreamStats<ByteBuf> readDecompress;
    private final BasicStreamStats<?> readDeserialize;
    private final DetailedStreamStats<?> readDeserializeDetailed;
    private final DetailedStreamStats<ByteBuf> writeFile;
    private final DetailedStreamStats<ByteBuf> writeCompress;
    private final BasicStreamStats<?> writeSerialize;
    private final DetailedStreamStats<?> writeSerializeDetailed;
    private final ValueStats chunksCount;
    private final EventStats readChunks;
    private final EventStats writtenChunks;
    private final EventStats deletedChunks;
    private boolean detailed;
    private static final boolean CHECKS = Checks.isEnabled(AggregationChunkStorage.class);
    public static final int DEFAULT_PART_SIZE = ApplicationSettings.getInt(MinioChunkStorage.class, "partSize", 10485760).intValue();
    public static final MemSize DEFAULT_BUFFER_SIZE = MemSize.kilobytes(256);
    public static final Duration DEFAULT_SMOOTHING_WINDOW = ApplicationSettings.getDuration(MinioChunkStorage.class, "smoothingWindow", Duration.ofMinutes(5));

    /* loaded from: input_file:io/activej/cube/aggregation/MinioChunkStorage$Builder.class */
    public final class Builder extends AbstractBuilder<Builder, MinioChunkStorage> {
        private Builder() {
        }

        public Builder withPartSize(int i) {
            checkNotBuilt(this);
            MinioChunkStorage.this.partSize = i;
            return this;
        }

        public Builder withFrameFormat(FrameFormat frameFormat) {
            checkNotBuilt(this);
            MinioChunkStorage.this.frameFormat = frameFormat;
            return this;
        }

        public Builder withBufferSize(MemSize memSize) {
            checkNotBuilt(this);
            MinioChunkStorage.this.bufferSize = memSize;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: doBuild, reason: merged with bridge method [inline-methods] */
        public MinioChunkStorage m16doBuild() {
            return MinioChunkStorage.this;
        }
    }

    private MinioChunkStorage(Reactor reactor, AsyncSupplier<Long> asyncSupplier, MinioAsyncClient minioAsyncClient, Executor executor, String str) {
        super(reactor);
        this.partSize = DEFAULT_PART_SIZE;
        this.frameFormat = FrameFormats.lz4();
        this.bufferSize = DEFAULT_BUFFER_SIZE;
        this.promiseAsyncSupplier = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.promiseList = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.promiseOpenR = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.promiseOpenW = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.promiseDelete = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.readFile = (DetailedStreamStats) StreamStats.detailedBuilder().withSizeCounter(StreamStatsSizeCounter.forByteBufs()).build();
        this.readDecompress = (DetailedStreamStats) StreamStats.detailedBuilder().withSizeCounter(StreamStatsSizeCounter.forByteBufs()).build();
        this.readDeserialize = StreamStats.basic();
        this.readDeserializeDetailed = StreamStats.detailed();
        this.writeFile = (DetailedStreamStats) StreamStats.detailedBuilder().withSizeCounter(StreamStatsSizeCounter.forByteBufs()).build();
        this.writeCompress = (DetailedStreamStats) StreamStats.detailedBuilder().withSizeCounter(StreamStatsSizeCounter.forByteBufs()).build();
        this.writeSerialize = StreamStats.basic();
        this.writeSerializeDetailed = StreamStats.detailed();
        this.chunksCount = ValueStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.readChunks = EventStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.writtenChunks = EventStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.deletedChunks = EventStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.idGenerator = asyncSupplier;
        this.client = minioAsyncClient;
        this.executor = executor;
        this.bucket = str;
    }

    public static MinioChunkStorage create(Reactor reactor, AsyncSupplier<Long> asyncSupplier, MinioAsyncClient minioAsyncClient, Executor executor, String str) {
        return (MinioChunkStorage) builder(reactor, asyncSupplier, minioAsyncClient, executor, str).build();
    }

    public static Builder builder(Reactor reactor, AsyncSupplier<Long> asyncSupplier, MinioAsyncClient minioAsyncClient, Executor executor, String str) {
        return new Builder();
    }

    @Override // io.activej.cube.aggregation.IAggregationChunkStorage
    public Promise<Long> createId() {
        if (CHECKS) {
            Reactive.checkInReactorThread(this);
        }
        return this.idGenerator.get().mapException(exc -> {
            return new AggregationException("Could not create ID", exc);
        }).whenComplete(this.promiseAsyncSupplier.recordStats());
    }

    @Override // io.activej.cube.aggregation.IAggregationChunkStorage
    public <T> Promise<StreamSupplier<T>> read(AggregationStructure aggregationStructure, List<String> list, Class<T> cls, long j, DefiningClassLoader definingClassLoader) {
        if (CHECKS) {
            Reactive.checkInReactorThread(this);
        }
        try {
            return Promise.ofCompletionStage(this.client.getObject(GetObjectArgs.builder().bucket(this.bucket).object(toObjectName(j)).build())).mapException(exc -> {
                return new AggregationException("Failed to download chunk '" + j + "'", exc);
            }).whenComplete(this.promiseOpenR.recordStats()).map(getObjectResponse -> {
                return ((StreamSupplier) ((StreamSupplier) ((ChannelSupplier) ((ChannelSupplier) ((ChannelSupplier) ChannelSuppliers.ofInputStream(this.executor, getObjectResponse).transformWith(this.readFile)).transformWith(ChannelFrameDecoder.create(this.frameFormat))).transformWith(this.readDecompress)).transformWith(ChannelDeserializer.create(Utils.createBinarySerializer(aggregationStructure, cls, aggregationStructure.getKeys(), list, definingClassLoader)))).transformWith(this.detailed ? this.readDeserializeDetailed : this.readDeserialize)).withEndOfStream(promise -> {
                    Objects.requireNonNull(getObjectResponse);
                    Promise whenComplete = promise.whenComplete(getObjectResponse::close);
                    EventStats eventStats = this.readChunks;
                    Objects.requireNonNull(eventStats);
                    return whenComplete.whenResult(eventStats::recordEvent).mapException(exc2 -> {
                        return new AggregationException("Failed to read chunk '" + j + "'", exc2);
                    });
                });
            });
        } catch (Exception e) {
            return Promise.ofException(new AggregationException("Failed to download chunk '" + j + "'", e)).whenComplete(this.promiseOpenR.recordStats());
        }
    }

    @Override // io.activej.cube.aggregation.IAggregationChunkStorage
    public <T> Promise<StreamConsumer<T>> write(AggregationStructure aggregationStructure, List<String> list, Class<T> cls, long j, DefiningClassLoader definingClassLoader) {
        if (CHECKS) {
            Reactive.checkInReactorThread(this);
        }
        PipedOutputStream pipedOutputStream = new PipedOutputStream();
        try {
            try {
                CompletableFuture putObject = this.client.putObject(PutObjectArgs.builder().bucket(this.bucket).object(toObjectName(j)).stream(new PipedInputStream(pipedOutputStream), -1L, this.partSize).build());
                return Promise.of(StreamConsumers.ofSupplier(streamSupplier -> {
                    return ((ChannelSupplier) ((ChannelSupplier) ((ChannelSupplier) ((ChannelSupplier) ((StreamSupplier) streamSupplier.transformWith(this.detailed ? this.writeSerializeDetailed : this.writeSerialize)).transformWith((StreamSupplierTransformer) ChannelSerializer.builder(Utils.createBinarySerializer(aggregationStructure, cls, aggregationStructure.getKeys(), list, definingClassLoader)).withInitialBufferSize(this.bufferSize).build())).transformWith(this.writeCompress)).transformWith(ChannelFrameEncoder.create(this.frameFormat))).transformWith(this.writeFile)).streamTo(ChannelConsumers.ofOutputStream(this.executor, pipedOutputStream));
                }).withAcknowledgement(promise -> {
                    Promise both = promise.both(Promise.ofCompletionStage(putObject));
                    EventStats eventStats = this.writtenChunks;
                    Objects.requireNonNull(eventStats);
                    return both.whenResult(eventStats::recordEvent).mapException(exc -> {
                        return new AggregationException("Failed to write chunk '" + j + "'", exc);
                    });
                })).whenComplete(this.promiseOpenW.recordStats());
            } catch (Exception e) {
                return Promise.ofException(new AggregationException("Failed to upload chunk '" + j + "'", e)).whenComplete(this.promiseOpenW.recordStats());
            }
        } catch (IOException e2) {
            return Promise.ofException(new AggregationException("Failed to upload chunk '" + j + "'", e2)).whenComplete(this.promiseOpenW.recordStats());
        }
    }

    @Override // io.activej.cube.aggregation.IAggregationChunkStorage
    public Promise<Void> finish(Set<Long> set) {
        Reactive.checkInReactorThread(this);
        return Promise.complete();
    }

    @Override // io.activej.cube.aggregation.IAggregationChunkStorage
    public Promise<Set<Long>> listChunks() {
        Reactive.checkInReactorThread(this);
        Iterable listObjects = this.client.listObjects(ListObjectsArgs.builder().bucket(this.bucket).build());
        return Promise.ofBlocking(this.executor, () -> {
            HashSet hashSet = new HashSet();
            Iterator it = listObjects.iterator();
            while (it.hasNext()) {
                hashSet.add(Long.valueOf(fromObjectName(((Item) ((Result) it.next()).get()).objectName())));
            }
            return hashSet;
        }).whenResult(set -> {
            this.chunksCount.recordValue(set.size());
        }).whenComplete(this.promiseList.recordStats());
    }

    @Override // io.activej.cube.aggregation.IAggregationChunkStorage
    public Promise<Void> deleteChunks(Set<Long> set) {
        if (CHECKS) {
            Reactive.checkInReactorThread(this);
        }
        Iterable removeObjects = this.client.removeObjects(RemoveObjectsArgs.builder().bucket(this.bucket).objects(set.stream().map((v1) -> {
            return toObjectName(v1);
        }).map(DeleteObject::new).toList()).build());
        return Promise.ofBlocking(this.executor, () -> {
            Iterator it = removeObjects.iterator();
            while (it.hasNext()) {
                try {
                    DeleteError deleteError = (DeleteError) ((Result) it.next()).get();
                    if (deleteError != null) {
                        throw new AggregationException("Failed to delete chunks: " + deleteError);
                    }
                } catch (Exception e) {
                    throw new AggregationException("Failed to delete chunks", e);
                }
            }
        }).whenComplete(this.promiseDelete.recordStats()).whenResult(() -> {
            this.deletedChunks.recordEvents(set.size());
        });
    }

    public Promise<?> start() {
        Reactive.checkInReactorThread(this);
        try {
            return Promise.ofCompletionStage(this.client.bucketExists(BucketExistsArgs.builder().bucket(this.bucket).build())).mapException(exc -> {
                return new AggregationException("Failed to start storage", exc);
            }).whenResult(bool -> {
                if (!bool.booleanValue()) {
                    throw new AggregationException("Bucket " + this.bucket + " does not exist");
                }
            });
        } catch (Exception e) {
            return Promise.ofException(new AggregationException("Failed to start storage", e));
        }
    }

    public Promise<?> stop() {
        return Promise.complete();
    }

    private String toObjectName(long j) {
        return ChunkIdJsonCodec.toFileName(j) + ".log";
    }

    private long fromObjectName(String str) throws MalformedDataException {
        return ChunkIdJsonCodec.fromFileName(str.substring(0, str.length() - ".log".length()));
    }

    @JmxAttribute
    public PromiseStats getPromiseAsyncSupplier() {
        return this.promiseAsyncSupplier;
    }

    @JmxAttribute
    public PromiseStats getPromiseList() {
        return this.promiseList;
    }

    @JmxAttribute
    public PromiseStats getPromiseOpenR() {
        return this.promiseOpenR;
    }

    @JmxAttribute
    public PromiseStats getPromiseOpenW() {
        return this.promiseOpenW;
    }

    @JmxAttribute
    public DetailedStreamStats getReadFile() {
        return this.readFile;
    }

    @JmxAttribute
    public DetailedStreamStats getReadDecompress() {
        return this.readDecompress;
    }

    @JmxAttribute
    public BasicStreamStats getReadDeserialize() {
        return this.readDeserialize;
    }

    @JmxAttribute
    public DetailedStreamStats getReadDeserializeDetailed() {
        return this.readDeserializeDetailed;
    }

    @JmxAttribute
    public BasicStreamStats getWriteSerialize() {
        return this.writeSerialize;
    }

    @JmxAttribute
    public DetailedStreamStats getWriteSerializeDetailed() {
        return this.writeSerializeDetailed;
    }

    @JmxAttribute
    public DetailedStreamStats getWriteCompress() {
        return this.writeCompress;
    }

    @JmxAttribute
    public DetailedStreamStats getWriteFile() {
        return this.writeFile;
    }

    @JmxAttribute
    public EventStats getReadChunks() {
        return this.readChunks;
    }

    @JmxAttribute
    public EventStats getWrittenChunks() {
        return this.writtenChunks;
    }

    @JmxAttribute
    public EventStats getDeletedChunks() {
        return this.deletedChunks;
    }

    @JmxAttribute
    public ValueStats getChunksCount() {
        return this.chunksCount;
    }

    @JmxAttribute
    public PromiseStats getPromiseDelete() {
        return this.promiseDelete;
    }

    @JmxOperation
    public void startDetailedMonitoring() {
        this.detailed = true;
    }

    @JmxOperation
    public void stopDetailedMonitoring() {
        this.detailed = false;
    }
}
