package io.datakernel.aggregation;

import io.datakernel.aggregation.ot.AggregationStructure;
import io.datakernel.async.Stage;
import io.datakernel.bytebuf.ByteBuf;
import io.datakernel.codegen.DefiningClassLoader;
import io.datakernel.eventloop.Eventloop;
import io.datakernel.jmx.EventloopJmxMBeanEx;
import io.datakernel.jmx.JmxAttribute;
import io.datakernel.jmx.JmxOperation;
import io.datakernel.jmx.StageStats;
import io.datakernel.remotefs.IRemoteFsClient;
import io.datakernel.remotefs.RemoteFsClient;
import io.datakernel.stream.StreamConsumerWithResult;
import io.datakernel.stream.StreamProducerWithResult;
import io.datakernel.stream.processor.StreamBinaryDeserializer;
import io.datakernel.stream.processor.StreamBinarySerializer;
import io.datakernel.stream.processor.StreamByteChunker;
import io.datakernel.stream.processor.StreamLZ4Compressor;
import io.datakernel.stream.processor.StreamLZ4Decompressor;
import io.datakernel.stream.processor.StreamTransformer;
import io.datakernel.stream.stats.StreamStats;
import io.datakernel.stream.stats.StreamStatsBasic;
import io.datakernel.stream.stats.StreamStatsDetailed;
import io.datakernel.stream.stats.StreamStatsSizeCounter;
import io.datakernel.util.MemSize;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/* loaded from: input_file:io/datakernel/aggregation/RemoteFsChunkStorage.class */
public final class RemoteFsChunkStorage implements AggregationChunkStorage, EventloopJmxMBeanEx {
    public static final String LOG = ".log";
    public static final String TEMP_LOG = ".temp";
    private final Eventloop eventloop;
    private final IRemoteFsClient client;
    private final IdGenerator<Long> idGenerator;
    private boolean detailed;
    public static final MemSize DEFAULT_BUFFER_SIZE = MemSize.kilobytes(256);
    public static final Duration DEFAULT_SMOOTHING_WINDOW = Duration.ofMinutes(5);
    private MemSize bufferSize = DEFAULT_BUFFER_SIZE;
    private final StageStats stageIdGenerator = StageStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final StageStats stageOpenR1 = StageStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final StageStats stageOpenR2 = StageStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final StageStats stageOpenR3 = StageStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final StageStats stageOpenW = StageStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final StageStats stageFinishChunks = StageStats.create(DEFAULT_SMOOTHING_WINDOW);
    private final StreamStatsDetailed<ByteBuf> readRemoteFS = StreamStats.detailed(StreamStatsSizeCounter.forByteBufs());
    private final StreamStatsDetailed<ByteBuf> readDecompress = StreamStats.detailed(StreamStatsSizeCounter.forByteBufs());
    private final StreamStatsBasic<?> readDeserialize = StreamStats.basic();
    private final StreamStatsDetailed<?> readDeserializeDetailed = StreamStats.detailed();
    private final StreamStatsBasic<?> writeSerialize = StreamStats.basic();
    private final StreamStatsDetailed<?> writeSerializeDetailed = StreamStats.detailed();
    private final StreamStatsDetailed<ByteBuf> writeCompress = StreamStats.detailed(StreamStatsSizeCounter.forByteBufs());
    private final StreamStatsDetailed<ByteBuf> writeChunker = StreamStats.detailed(StreamStatsSizeCounter.forByteBufs());
    private final StreamStatsDetailed<ByteBuf> writeRemoteFS = StreamStats.detailed(StreamStatsSizeCounter.forByteBufs());

    private RemoteFsChunkStorage(Eventloop eventloop, IdGenerator<Long> idGenerator, InetSocketAddress inetSocketAddress) {
        this.eventloop = eventloop;
        this.idGenerator = idGenerator;
        this.client = RemoteFsClient.create(eventloop, inetSocketAddress);
    }

    public static RemoteFsChunkStorage create(Eventloop eventloop, IdGenerator<Long> idGenerator, InetSocketAddress inetSocketAddress) {
        return new RemoteFsChunkStorage(eventloop, idGenerator, inetSocketAddress);
    }

    public RemoteFsChunkStorage withBufferSize(MemSize memSize) {
        this.bufferSize = memSize;
        return this;
    }

    @Override // io.datakernel.aggregation.AggregationChunkStorage
    public <T> Stage<StreamProducerWithResult<T, Void>> read(AggregationStructure aggregationStructure, List<String> list, Class<T> cls, long j, DefiningClassLoader definingClassLoader) {
        return this.client.download(path(j), 0L).whenComplete(this.stageOpenR1.recordStats()).thenApply(streamProducerWithResult -> {
            return streamProducerWithResult.with(this.readRemoteFS).with(StreamLZ4Decompressor.create()).with(this.readDecompress).with(StreamBinaryDeserializer.create(AggregationUtils.createBufferSerializer(aggregationStructure, cls, aggregationStructure.getKeys(), list, definingClassLoader))).with(this.detailed ? this.readDeserializeDetailed : this.readDeserialize).withEndOfStreamAsResult().withLateBinding();
        });
    }

    private String path(long j) {
        return j + ".log";
    }

    @Override // io.datakernel.aggregation.AggregationChunkStorage
    public <T> Stage<StreamConsumerWithResult<T, Void>> write(AggregationStructure aggregationStructure, List<String> list, Class<T> cls, long j, DefiningClassLoader definingClassLoader) {
        return this.client.upload(tempPath(j)).whenComplete(this.stageOpenW.recordStats()).thenApply(streamConsumerWithResult -> {
            return StreamTransformer.idenity().with(this.detailed ? this.writeSerializeDetailed : this.writeSerialize).with(StreamBinarySerializer.create(AggregationUtils.createBufferSerializer(aggregationStructure, cls, aggregationStructure.getKeys(), list, definingClassLoader)).withInitialBufferSize(this.bufferSize)).with(this.writeCompress).with(StreamLZ4Compressor.fastCompressor()).with(this.writeChunker).with(StreamByteChunker.create(this.bufferSize.map(l -> {
                return Long.valueOf(l.longValue() / 2);
            }), this.bufferSize.map(l2 -> {
                return Long.valueOf(l2.longValue() * 2);
            }))).with(this.writeRemoteFS).applyTo(streamConsumerWithResult);
        });
    }

    private String tempPath(long j) {
        return j + ".temp";
    }

    @Override // io.datakernel.aggregation.AggregationChunkStorage
    public Stage<Void> finish(Set<Long> set) {
        return this.client.move((Map) set.stream().collect(Collectors.toMap((v1) -> {
            return tempPath(v1);
        }, (v1) -> {
            return path(v1);
        }))).whenComplete(this.stageFinishChunks.recordStats());
    }

    @Override // io.datakernel.aggregation.IdGenerator
    public Stage<Long> createId() {
        return this.idGenerator.createId().whenComplete(this.stageIdGenerator.recordStats());
    }

    public Eventloop getEventloop() {
        return this.eventloop;
    }

    @JmxAttribute
    public StageStats getStageIdGenerator() {
        return this.stageIdGenerator;
    }

    @JmxAttribute
    public StageStats getStageOpenR1() {
        return this.stageOpenR1;
    }

    @JmxAttribute
    public StageStats getStageOpenR2() {
        return this.stageOpenR2;
    }

    @JmxAttribute
    public StageStats getStageOpenR3() {
        return this.stageOpenR3;
    }

    @JmxAttribute
    public StageStats getStageOpenW() {
        return this.stageOpenW;
    }

    @JmxAttribute
    public StageStats getStageFinishChunks() {
        return this.stageFinishChunks;
    }

    @JmxAttribute
    public StreamStatsDetailed getReadRemoteFS() {
        return this.readRemoteFS;
    }

    @JmxAttribute
    public StreamStatsDetailed getReadDecompress() {
        return this.readDecompress;
    }

    @JmxAttribute
    public StreamStatsBasic getReadDeserialize() {
        return this.readDeserialize;
    }

    @JmxAttribute
    public StreamStatsDetailed getReadDeserializeDetailed() {
        return this.readDeserializeDetailed;
    }

    @JmxAttribute
    public StreamStatsBasic getWriteSerialize() {
        return this.writeSerialize;
    }

    @JmxAttribute
    public StreamStatsDetailed getWriteSerializeDetailed() {
        return this.writeSerializeDetailed;
    }

    @JmxAttribute
    public StreamStatsDetailed getWriteCompress() {
        return this.writeCompress;
    }

    @JmxAttribute
    public StreamStatsDetailed getWriteChunker() {
        return this.writeChunker;
    }

    @JmxAttribute
    public StreamStatsDetailed getWriteRemoteFS() {
        return this.writeRemoteFS;
    }

    @JmxOperation
    public void startDetailedMonitoring() {
        this.detailed = true;
    }

    @JmxOperation
    public void stopDetailedMonitoring() {
        this.detailed = false;
    }
}
