package io.activej.crdt.storage.local;

import io.activej.async.function.AsyncRunnable;
import io.activej.async.function.AsyncRunnables;
import io.activej.async.service.ReactiveService;
import io.activej.common.ApplicationSettings;
import io.activej.common.Checks;
import io.activej.common.builder.AbstractBuilder;
import io.activej.crdt.CrdtData;
import io.activej.crdt.CrdtException;
import io.activej.crdt.CrdtTombstone;
import io.activej.crdt.function.CrdtFilter;
import io.activej.crdt.function.CrdtFunction;
import io.activej.crdt.primitives.CrdtType;
import io.activej.crdt.storage.ICrdtStorage;
import io.activej.crdt.util.CrdtDataBinarySerializer;
import io.activej.crdt.util.Utils;
import io.activej.csp.consumer.ChannelConsumers;
import io.activej.csp.supplier.ChannelSupplier;
import io.activej.datastream.consumer.StreamConsumer;
import io.activej.datastream.consumer.StreamConsumers;
import io.activej.datastream.csp.ChannelDeserializer;
import io.activej.datastream.csp.ChannelSerializer;
import io.activej.datastream.processor.reducer.Reducer;
import io.activej.datastream.processor.reducer.StreamReducer;
import io.activej.datastream.processor.transformer.AbstractStreamTransformer;
import io.activej.datastream.processor.transformer.StreamTransformers;
import io.activej.datastream.stats.BasicStreamStats;
import io.activej.datastream.stats.DetailedStreamStats;
import io.activej.datastream.stats.StreamStats;
import io.activej.datastream.supplier.StreamDataAcceptor;
import io.activej.datastream.supplier.StreamSupplier;
import io.activej.fs.FileMetadata;
import io.activej.fs.IFileSystem;
import io.activej.fs.exception.FileNotFoundException;
import io.activej.jmx.api.attribute.JmxAttribute;
import io.activej.jmx.api.attribute.JmxOperation;
import io.activej.jmx.stats.EventStats;
import io.activej.promise.Promise;
import io.activej.promise.Promises;
import io.activej.promise.SettablePromise;
import io.activej.promise.jmx.PromiseStats;
import io.activej.reactor.AbstractReactive;
import io.activej.reactor.Reactive;
import io.activej.reactor.Reactor;
import io.activej.reactor.jmx.ReactiveJmxBeanWithStats;
import io.activej.serializer.BinaryInput;
import io.activej.serializer.BinaryOutput;
import io.activej.serializer.BinarySerializer;
import io.activej.serializer.BinarySerializers;
import io.activej.serializer.CorruptedDataException;
import java.lang.Comparable;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.time.Duration;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.function.Function;
import java.util.function.Supplier;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/activej/crdt/storage/local/FileSystemCrdtStorage.class */
public final class FileSystemCrdtStorage<K extends Comparable<K>, S> extends AbstractReactive implements ICrdtStorage<K, S>, ReactiveService, ReactiveJmxBeanWithStats {
    private static final Logger logger = LoggerFactory.getLogger(FileSystemCrdtStorage.class);
    private static final boolean CHECKS = Checks.isEnabled(FileSystemCrdtStorage.class);
    public static final Duration DEFAULT_SMOOTHING_WINDOW = ApplicationSettings.getDuration(FileSystemCrdtStorage.class, "smoothingWindow", Duration.ofMinutes(1));
    public static final String FILE_EXTENSION = ".bin";
    private final IFileSystem fileSystem;
    private final CrdtFunction<S> function;
    private final BinarySerializer<CrdtReducingData<K, S>> serializer;

    @Nullable
    private Set<String> taken;
    private Supplier<String> namingStrategy;
    private CrdtFilter<S> filter;
    private boolean detailedStats;
    private final AsyncRunnable consolidate;
    private final BasicStreamStats<CrdtData<K, S>> uploadStats;
    private final DetailedStreamStats<CrdtData<K, S>> uploadStatsDetailed;
    private final BasicStreamStats<CrdtData<K, S>> downloadStats;
    private final DetailedStreamStats<CrdtData<K, S>> downloadStatsDetailed;
    private final BasicStreamStats<CrdtData<K, S>> takeStats;
    private final DetailedStreamStats<CrdtData<K, S>> takeStatsDetailed;
    private final BasicStreamStats<CrdtTombstone<K>> removeStats;
    private final DetailedStreamStats<CrdtTombstone<K>> removeStatsDetailed;
    private final EventStats uploadedItems;
    private final EventStats downloadedItems;
    private final EventStats takenItems;
    private final EventStats removedItems;
    private final PromiseStats consolidationStats;

    /* loaded from: input_file:io/activej/crdt/storage/local/FileSystemCrdtStorage$Builder.class */
    public final class Builder extends AbstractBuilder<FileSystemCrdtStorage<K, S>.Builder, FileSystemCrdtStorage<K, S>> {
        private Builder() {
        }

        public FileSystemCrdtStorage<K, S>.Builder withNamingStrategy(Supplier<String> supplier) {
            checkNotBuilt(this);
            FileSystemCrdtStorage.this.namingStrategy = supplier;
            return this;
        }

        public FileSystemCrdtStorage<K, S>.Builder withFilter(CrdtFilter<S> crdtFilter) {
            checkNotBuilt(this);
            FileSystemCrdtStorage.this.filter = crdtFilter;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: doBuild, reason: merged with bridge method [inline-methods] */
        public FileSystemCrdtStorage<K, S> m26doBuild() {
            return FileSystemCrdtStorage.this;
        }
    }

    /* loaded from: input_file:io/activej/crdt/storage/local/FileSystemCrdtStorage$CrdtAccumulator.class */
    public static class CrdtAccumulator<S> {
        final Set<CrdtEntry<S>> entries = new HashSet();
        private long tombstoneTimestamp;

        CrdtAccumulator(@Nullable S s, long j) {
            if (s == null) {
                this.tombstoneTimestamp = j;
            } else {
                this.entries.add(new CrdtEntry<>(s, j));
            }
        }
    }

    /* loaded from: input_file:io/activej/crdt/storage/local/FileSystemCrdtStorage$CrdtEntry.class */
    public static final class CrdtEntry<S> extends Record {
        private final S state;
        private final long timestamp;

        public CrdtEntry(S s, long j) {
            this.state = s;
            this.timestamp = j;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, CrdtEntry.class), CrdtEntry.class, "state;timestamp", "FIELD:Lio/activej/crdt/storage/local/FileSystemCrdtStorage$CrdtEntry;->state:Ljava/lang/Object;", "FIELD:Lio/activej/crdt/storage/local/FileSystemCrdtStorage$CrdtEntry;->timestamp:J").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, CrdtEntry.class), CrdtEntry.class, "state;timestamp", "FIELD:Lio/activej/crdt/storage/local/FileSystemCrdtStorage$CrdtEntry;->state:Ljava/lang/Object;", "FIELD:Lio/activej/crdt/storage/local/FileSystemCrdtStorage$CrdtEntry;->timestamp:J").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, CrdtEntry.class, Object.class), CrdtEntry.class, "state;timestamp", "FIELD:Lio/activej/crdt/storage/local/FileSystemCrdtStorage$CrdtEntry;->state:Ljava/lang/Object;", "FIELD:Lio/activej/crdt/storage/local/FileSystemCrdtStorage$CrdtEntry;->timestamp:J").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public S state() {
            return this.state;
        }

        public long timestamp() {
            return this.timestamp;
        }
    }

    /* loaded from: input_file:io/activej/crdt/storage/local/FileSystemCrdtStorage$CrdtReducer.class */
    public final class CrdtReducer implements Reducer<K, CrdtReducingData<K, S>, CrdtReducingData<K, S>, CrdtAccumulator<S>> {
        final boolean includeTombstones;

        CrdtReducer(boolean z) {
            this.includeTombstones = z;
        }

        public CrdtAccumulator<S> onFirstItem(StreamDataAcceptor<CrdtReducingData<K, S>> streamDataAcceptor, K k, CrdtReducingData<K, S> crdtReducingData) {
            return new CrdtAccumulator<>(((CrdtReducingData) crdtReducingData).state, ((CrdtReducingData) crdtReducingData).timestamp);
        }

        public CrdtAccumulator<S> onNextItem(StreamDataAcceptor<CrdtReducingData<K, S>> streamDataAcceptor, K k, CrdtReducingData<K, S> crdtReducingData, CrdtAccumulator<S> crdtAccumulator) {
            if (((CrdtReducingData) crdtReducingData).state == null) {
                ((CrdtAccumulator) crdtAccumulator).tombstoneTimestamp = ((CrdtReducingData) crdtReducingData).timestamp;
                crdtAccumulator.entries.removeIf(crdtEntry -> {
                    return crdtEntry.timestamp <= crdtReducingData.timestamp;
                });
            } else if (((CrdtReducingData) crdtReducingData).timestamp > ((CrdtAccumulator) crdtAccumulator).tombstoneTimestamp) {
                crdtAccumulator.entries.add(new CrdtEntry<>(((CrdtReducingData) crdtReducingData).state, ((CrdtReducingData) crdtReducingData).timestamp));
            }
            return crdtAccumulator;
        }

        public void onComplete(StreamDataAcceptor<CrdtReducingData<K, S>> streamDataAcceptor, K k, CrdtAccumulator<S> crdtAccumulator) {
            if (crdtAccumulator.entries.isEmpty()) {
                if (this.includeTombstones) {
                    streamDataAcceptor.accept(new CrdtReducingData(k, null, ((CrdtAccumulator) crdtAccumulator).tombstoneTimestamp));
                    return;
                }
                return;
            }
            Iterator<CrdtEntry<S>> it = crdtAccumulator.entries.iterator();
            CrdtEntry<S> next = it.next();
            long j = ((CrdtEntry) next).timestamp;
            S s = ((CrdtEntry) next).state;
            while (it.hasNext()) {
                CrdtEntry<S> next2 = it.next();
                s = FileSystemCrdtStorage.this.function.merge(s, j, ((CrdtEntry) next2).state, ((CrdtEntry) next2).timestamp);
                j = Long.max(j, ((CrdtEntry) next2).timestamp);
            }
            if (FileSystemCrdtStorage.this.filter.test(s)) {
                streamDataAcceptor.accept(new CrdtReducingData(k, s, j));
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        public /* bridge */ /* synthetic */ void onComplete(StreamDataAcceptor streamDataAcceptor, Object obj, Object obj2) {
            onComplete((StreamDataAcceptor<CrdtReducingData<StreamDataAcceptor, S>>) streamDataAcceptor, (StreamDataAcceptor) obj, (CrdtAccumulator) obj2);
        }

        /* JADX WARN: Multi-variable type inference failed */
        public /* bridge */ /* synthetic */ Object onNextItem(StreamDataAcceptor streamDataAcceptor, Object obj, Object obj2, Object obj3) {
            return onNextItem((StreamDataAcceptor<CrdtReducingData<StreamDataAcceptor, S>>) streamDataAcceptor, (StreamDataAcceptor) obj, (CrdtReducingData<StreamDataAcceptor, S>) obj2, (CrdtAccumulator) obj3);
        }

        /* JADX WARN: Multi-variable type inference failed */
        public /* bridge */ /* synthetic */ Object onFirstItem(StreamDataAcceptor streamDataAcceptor, Object obj, Object obj2) {
            return onFirstItem((StreamDataAcceptor<CrdtReducingData<StreamDataAcceptor, S>>) streamDataAcceptor, (StreamDataAcceptor) obj, (CrdtReducingData<StreamDataAcceptor, S>) obj2);
        }
    }

    /* loaded from: input_file:io/activej/crdt/storage/local/FileSystemCrdtStorage$CrdtReducingData.class */
    public static final class CrdtReducingData<K extends Comparable<K>, S> extends Record {
        private final K key;

        @Nullable
        private final S state;
        private final long timestamp;

        public CrdtReducingData(K k, @Nullable S s, long j) {
            this.key = k;
            this.state = s;
            this.timestamp = j;
        }

        static <K extends Comparable<K>, S> CrdtReducingData<K, S> ofData(CrdtData<K, S> crdtData) {
            return new CrdtReducingData<>(crdtData.getKey(), crdtData.getState(), crdtData.getTimestamp());
        }

        static <K extends Comparable<K>, S> CrdtReducingData<K, S> ofTombstone(CrdtTombstone<K> crdtTombstone) {
            return new CrdtReducingData<>(crdtTombstone.getKey(), null, crdtTombstone.getTimestamp());
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, CrdtReducingData.class), CrdtReducingData.class, "key;state;timestamp", "FIELD:Lio/activej/crdt/storage/local/FileSystemCrdtStorage$CrdtReducingData;->key:Ljava/lang/Comparable;", "FIELD:Lio/activej/crdt/storage/local/FileSystemCrdtStorage$CrdtReducingData;->state:Ljava/lang/Object;", "FIELD:Lio/activej/crdt/storage/local/FileSystemCrdtStorage$CrdtReducingData;->timestamp:J").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, CrdtReducingData.class), CrdtReducingData.class, "key;state;timestamp", "FIELD:Lio/activej/crdt/storage/local/FileSystemCrdtStorage$CrdtReducingData;->key:Ljava/lang/Comparable;", "FIELD:Lio/activej/crdt/storage/local/FileSystemCrdtStorage$CrdtReducingData;->state:Ljava/lang/Object;", "FIELD:Lio/activej/crdt/storage/local/FileSystemCrdtStorage$CrdtReducingData;->timestamp:J").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, CrdtReducingData.class, Object.class), CrdtReducingData.class, "key;state;timestamp", "FIELD:Lio/activej/crdt/storage/local/FileSystemCrdtStorage$CrdtReducingData;->key:Ljava/lang/Comparable;", "FIELD:Lio/activej/crdt/storage/local/FileSystemCrdtStorage$CrdtReducingData;->state:Ljava/lang/Object;", "FIELD:Lio/activej/crdt/storage/local/FileSystemCrdtStorage$CrdtReducingData;->timestamp:J").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public K key() {
            return this.key;
        }

        @Nullable
        public S state() {
            return this.state;
        }

        public long timestamp() {
            return this.timestamp;
        }
    }

    /* loaded from: input_file:io/activej/crdt/storage/local/FileSystemCrdtStorage$NonEmptyFilter.class */
    public static final class NonEmptyFilter<T> extends AbstractStreamTransformer<T, T> {
        private final Runnable onNonEmpty;
        private boolean empty = true;

        private NonEmptyFilter(Runnable runnable) {
            this.onNonEmpty = runnable;
        }

        protected StreamDataAcceptor<T> onResumed(StreamDataAcceptor<T> streamDataAcceptor) {
            return obj -> {
                if (this.empty) {
                    this.empty = false;
                    this.onNonEmpty.run();
                }
                streamDataAcceptor.accept(obj);
            };
        }

        public boolean isEmpty() {
            return this.empty;
        }
    }

    private FileSystemCrdtStorage(Reactor reactor, IFileSystem iFileSystem, CrdtDataBinarySerializer<K, S> crdtDataBinarySerializer, CrdtFunction<S> crdtFunction) {
        super(reactor);
        this.namingStrategy = () -> {
            return UUID.randomUUID().toString();
        };
        this.filter = obj -> {
            return true;
        };
        this.consolidate = AsyncRunnables.reuse(this::doConsolidate);
        this.uploadStats = StreamStats.basic();
        this.uploadStatsDetailed = StreamStats.detailed();
        this.downloadStats = StreamStats.basic();
        this.downloadStatsDetailed = StreamStats.detailed();
        this.takeStats = StreamStats.basic();
        this.takeStatsDetailed = StreamStats.detailed();
        this.removeStats = StreamStats.basic();
        this.removeStatsDetailed = StreamStats.detailed();
        this.uploadedItems = EventStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.downloadedItems = EventStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.takenItems = EventStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.removedItems = EventStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.consolidationStats = PromiseStats.create(DEFAULT_SMOOTHING_WINDOW);
        this.fileSystem = iFileSystem;
        this.function = crdtFunction;
        this.serializer = createSerializer(crdtDataBinarySerializer);
    }

    public static <K extends Comparable<K>, S> FileSystemCrdtStorage<K, S> create(Reactor reactor, IFileSystem iFileSystem, CrdtDataBinarySerializer<K, S> crdtDataBinarySerializer, CrdtFunction<S> crdtFunction) {
        return (FileSystemCrdtStorage) builder(reactor, iFileSystem, crdtDataBinarySerializer, crdtFunction).build();
    }

    public static <K extends Comparable<K>, S extends CrdtType<S>> FileSystemCrdtStorage<K, S> create(Reactor reactor, IFileSystem iFileSystem, CrdtDataBinarySerializer<K, S> crdtDataBinarySerializer) {
        return (FileSystemCrdtStorage) builder(reactor, iFileSystem, crdtDataBinarySerializer, CrdtFunction.ofCrdtType()).build();
    }

    public static <K extends Comparable<K>, S> FileSystemCrdtStorage<K, S>.Builder builder(Reactor reactor, IFileSystem iFileSystem, CrdtDataBinarySerializer<K, S> crdtDataBinarySerializer, CrdtFunction<S> crdtFunction) {
        return new Builder();
    }

    public static <K extends Comparable<K>, S extends CrdtType<S>> FileSystemCrdtStorage<K, S>.Builder builder(Reactor reactor, IFileSystem iFileSystem, CrdtDataBinarySerializer<K, S> crdtDataBinarySerializer) {
        return new Builder();
    }

    @Override // io.activej.crdt.storage.ICrdtStorage
    public Promise<StreamConsumer<CrdtData<K, S>>> upload() {
        if (CHECKS) {
            Reactive.checkInReactorThread(this);
        }
        StreamConsumer streamConsumer = (StreamConsumer) uploadNonEmpty(this.namingStrategy.get() + ".bin", CrdtReducingData::ofData).transformWith(this.detailedStats ? this.uploadStatsDetailed : this.uploadStats);
        EventStats eventStats = this.uploadedItems;
        Objects.requireNonNull(eventStats);
        return Promise.of(((StreamConsumer) streamConsumer.transformWith(Utils.onItem(eventStats::recordEvent))).withAcknowledgement(promise -> {
            return promise.mapException(exc -> {
                return new CrdtException("Error while uploading CRDT data to file", exc);
            });
        }));
    }

    @Override // io.activej.crdt.storage.ICrdtStorage
    public Promise<StreamSupplier<CrdtData<K, S>>> download(long j) {
        if (CHECKS) {
            Reactive.checkInReactorThread(this);
        }
        return Promises.retry((streamSupplier, exc) -> {
            return !(exc instanceof FileNotFoundException);
        }, () -> {
            return this.fileSystem.list("*").then(map -> {
                return doDownload(map.keySet(), j, false);
            }).map(streamSupplier2 -> {
                StreamSupplier streamSupplier2 = (StreamSupplier) ((StreamSupplier) streamSupplier2.transformWith(StreamTransformers.mapper(crdtReducingData -> {
                    return new CrdtData(crdtReducingData.key, crdtReducingData.timestamp, crdtReducingData.state);
                }))).transformWith(this.detailedStats ? this.downloadStatsDetailed : this.downloadStats);
                EventStats eventStats = this.downloadedItems;
                Objects.requireNonNull(eventStats);
                return (StreamSupplier) streamSupplier2.transformWith(Utils.onItem(eventStats::recordEvent));
            });
        }).mapException(exc2 -> {
            return new CrdtException("Failed to download CRDT data", exc2);
        });
    }

    @Override // io.activej.crdt.storage.ICrdtStorage
    public Promise<StreamSupplier<CrdtData<K, S>>> take() {
        if (CHECKS) {
            Reactive.checkInReactorThread(this);
        }
        if (this.taken != null) {
            return Promise.ofException(new CrdtException("Data is already being taken"));
        }
        this.taken = new HashSet();
        return Promises.retry((streamSupplier, exc) -> {
            return !(exc instanceof FileNotFoundException);
        }, () -> {
            return this.fileSystem.list("*").whenResult(map -> {
                this.taken.addAll(map.keySet());
            }).then(map2 -> {
                return doDownload(map2.keySet(), 0L, false).whenException(exc2 -> {
                    this.taken = null;
                }).map(streamSupplier2 -> {
                    StreamSupplier streamSupplier2 = (StreamSupplier) ((StreamSupplier) streamSupplier2.transformWith(StreamTransformers.mapper(crdtReducingData -> {
                        return new CrdtData(crdtReducingData.key, crdtReducingData.timestamp, crdtReducingData.state);
                    }))).transformWith(this.detailedStats ? this.takeStatsDetailed : this.takeStats);
                    EventStats eventStats = this.takenItems;
                    Objects.requireNonNull(eventStats);
                    return (StreamSupplier) streamSupplier2.transformWith(Utils.onItem(eventStats::recordEvent));
                }).whenResult(streamSupplier3 -> {
                    streamSupplier3.getAcknowledgement().then(() -> {
                        return this.fileSystem.deleteAll(map2.keySet());
                    }).whenComplete(() -> {
                        this.taken = null;
                    });
                });
            });
        }).mapException(exc2 -> {
            return new CrdtException("Failed to take CRDT data", exc2);
        });
    }

    private Promise<StreamSupplier<CrdtReducingData<K, S>>> doDownload(Set<String> set, long j, boolean z) {
        return Promises.toList(set.stream().map(str -> {
            return this.fileSystem.download(str).map(channelSupplier -> {
                return (StreamSupplier) ((StreamSupplier) channelSupplier.transformWith(ChannelDeserializer.create(this.serializer))).transformWith(StreamTransformers.filter(crdtReducingData -> {
                    return crdtReducingData.timestamp >= j;
                }));
            });
        })).map(list -> {
            StreamReducer create = StreamReducer.create();
            list.forEach(streamSupplier -> {
                streamSupplier.streamTo(create.newInput(crdtReducingData -> {
                    return crdtReducingData.key;
                }, new CrdtReducer(z)));
            });
            return create.getOutput().withEndOfStream(promise -> {
                return promise.mapException(exc -> {
                    return new CrdtException("Error while downloading CRDT data", exc);
                });
            });
        });
    }

    @Override // io.activej.crdt.storage.ICrdtStorage
    public Promise<StreamConsumer<CrdtTombstone<K>>> remove() {
        if (CHECKS) {
            Reactive.checkInReactorThread(this);
        }
        StreamConsumer streamConsumer = (StreamConsumer) uploadNonEmpty(this.namingStrategy.get() + ".bin", CrdtReducingData::ofTombstone).transformWith(this.detailedStats ? this.removeStatsDetailed : this.removeStats);
        EventStats eventStats = this.removedItems;
        Objects.requireNonNull(eventStats);
        return Promise.of(((StreamConsumer) streamConsumer.transformWith(Utils.onItem(eventStats::recordEvent))).withAcknowledgement(promise -> {
            return promise.mapException(exc -> {
                return new CrdtException("Error while removing CRDT data", exc);
            });
        }));
    }

    @Override // io.activej.crdt.storage.ICrdtStorage
    public Promise<Void> ping() {
        if (CHECKS) {
            Reactive.checkInReactorThread(this);
        }
        return this.fileSystem.ping().mapException(exc -> {
            return new CrdtException("Failed to PING file system", exc);
        });
    }

    public Promise<?> start() {
        Reactive.checkInReactorThread(this);
        return Promise.complete();
    }

    public Promise<?> stop() {
        Reactive.checkInReactorThread(this);
        return Promise.complete();
    }

    public Promise<Void> consolidate() {
        Reactive.checkInReactorThread(this);
        return this.consolidate.run().whenComplete(this.consolidationStats.recordStats());
    }

    private Promise<Void> doConsolidate() {
        return this.fileSystem.list("*").map(map -> {
            return this.taken == null ? map : (Map) map.entrySet().stream().filter(entry -> {
                return !this.taken.contains(entry.getKey());
            }).collect(io.activej.common.Utils.entriesToLinkedHashMap());
        }).map(FileSystemCrdtStorage::pickFilesForConsolidation).then(set -> {
            if (set.isEmpty()) {
                logger.info("No files to consolidate");
                return Promise.complete();
            }
            String str = this.namingStrategy.get() + ".bin";
            logger.info("Started consolidating files into {} from {}", str, set);
            return doDownload(set, 0L, true).then(streamSupplier -> {
                return streamSupplier.streamTo(uploadNonEmpty(str, Function.identity()));
            }).then(() -> {
                return this.fileSystem.deleteAll(set);
            });
        }).mapException(exc -> {
            return new CrdtException("Files consolidation failed", exc);
        });
    }

    @VisibleForTesting
    static Set<String> pickFilesForConsolidation(Map<String, FileMetadata> map) {
        if (map.isEmpty()) {
            return Set.of();
        }
        TreeMap treeMap = new TreeMap();
        Iterator<Map.Entry<String, FileMetadata>> it = map.entrySet().iterator();
        while (it.hasNext()) {
            ((Set) treeMap.computeIfAbsent(Integer.valueOf((int) Math.log10(r0.getValue().getSize())), num -> {
                return new HashSet();
            })).add(it.next().getKey());
        }
        Set<String> of = Set.of();
        for (Set<String> set : treeMap.values()) {
            int size = set.size();
            if (size > 1 && size > of.size()) {
                of = set;
            }
        }
        return of;
    }

    private <T> StreamConsumer<T> uploadNonEmpty(String str, Function<T, CrdtReducingData<K, S>> function) {
        SettablePromise settablePromise = new SettablePromise();
        NonEmptyFilter nonEmptyFilter = new NonEmptyFilter(() -> {
            this.fileSystem.upload(str).subscribe(settablePromise);
        });
        return StreamConsumers.ofSupplier(streamSupplier -> {
            return ((ChannelSupplier) ((StreamSupplier) ((StreamSupplier) streamSupplier.transformWith(nonEmptyFilter)).transformWith(StreamTransformers.mapper(function))).transformWith(ChannelSerializer.create(this.serializer))).withEndOfStream(promise -> {
                return promise.whenComplete(() -> {
                    if (nonEmptyFilter.isEmpty()) {
                        settablePromise.set(ChannelConsumers.recycling());
                    }
                });
            }).streamTo(settablePromise);
        });
    }

    private static <K extends Comparable<K>, S> BinarySerializer<CrdtReducingData<K, S>> createSerializer(CrdtDataBinarySerializer<K, S> crdtDataBinarySerializer) {
        final BinarySerializer<K> keySerializer = crdtDataBinarySerializer.getKeySerializer();
        final BinarySerializer ofNullable = BinarySerializers.ofNullable(crdtDataBinarySerializer.getStateSerializer());
        return (BinarySerializer<CrdtReducingData<K, S>>) new BinarySerializer<CrdtReducingData<K, S>>() { // from class: io.activej.crdt.storage.local.FileSystemCrdtStorage.1
            public void encode(BinaryOutput binaryOutput, CrdtReducingData<K, S> crdtReducingData) {
                keySerializer.encode(binaryOutput, ((CrdtReducingData) crdtReducingData).key);
                ofNullable.encode(binaryOutput, ((CrdtReducingData) crdtReducingData).state);
                CrdtDataBinarySerializer.TIMESTAMP_SERIALIZER.encode(binaryOutput, Long.valueOf(((CrdtReducingData) crdtReducingData).timestamp));
            }

            /* renamed from: decode, reason: merged with bridge method [inline-methods] */
            public CrdtReducingData<K, S> m25decode(BinaryInput binaryInput) throws CorruptedDataException {
                return new CrdtReducingData<>((Comparable) keySerializer.decode(binaryInput), ofNullable.decode(binaryInput), ((Long) CrdtDataBinarySerializer.TIMESTAMP_SERIALIZER.decode(binaryInput)).longValue());
            }
        };
    }

    @JmxOperation
    public void startDetailedMonitoring() {
        this.detailedStats = true;
    }

    @JmxOperation
    public void stopDetailedMonitoring() {
        this.detailedStats = false;
    }

    @JmxAttribute
    public BasicStreamStats getUploadStats() {
        return this.uploadStats;
    }

    @JmxAttribute
    public DetailedStreamStats getUploadStatsDetailed() {
        return this.uploadStatsDetailed;
    }

    @JmxAttribute
    public BasicStreamStats getDownloadStats() {
        return this.downloadStats;
    }

    @JmxAttribute
    public DetailedStreamStats getDownloadStatsDetailed() {
        return this.downloadStatsDetailed;
    }

    @JmxAttribute
    public BasicStreamStats getTakeStats() {
        return this.takeStats;
    }

    @JmxAttribute
    public DetailedStreamStats getTakeStatsDetailed() {
        return this.takeStatsDetailed;
    }

    @JmxAttribute
    public BasicStreamStats getRemoveStats() {
        return this.removeStats;
    }

    @JmxAttribute
    public DetailedStreamStats getRemoveStatsDetailed() {
        return this.removeStatsDetailed;
    }

    @JmxAttribute
    public PromiseStats getConsolidationStats() {
        return this.consolidationStats;
    }

    @JmxAttribute
    public EventStats getUploadedItems() {
        return this.uploadedItems;
    }

    @JmxAttribute
    public EventStats getDownloadedItems() {
        return this.downloadedItems;
    }

    @JmxAttribute
    public EventStats getTakenItems() {
        return this.takenItems;
    }

    @JmxAttribute
    public EventStats getRemovedItems() {
        return this.removedItems;
    }
}
