package io.activej.crdt.util;

import io.activej.crdt.messaging.CrdtRequest;
import io.activej.crdt.messaging.CrdtResponse;
import io.activej.crdt.messaging.Version;
import io.activej.crdt.wal.FileWriteAheadLog;
import io.activej.datastream.csp.ChannelDeserializer;
import io.activej.datastream.csp.ChannelSerializer;
import io.activej.datastream.processor.transformer.AbstractStreamTransformer;
import io.activej.datastream.processor.transformer.StreamTransformer;
import io.activej.datastream.supplier.StreamDataAcceptor;
import io.activej.promise.Promise;
import io.activej.serializer.BinarySerializer;
import io.activej.serializer.stream.StreamCodec;
import io.activej.serializer.stream.StreamCodecs;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/* loaded from: input_file:io/activej/crdt/util/Utils.class */
public final class Utils {
    private static final StreamCodec<Version> VERSION_CODEC = StreamCodec.create((v1, v2) -> {
        return new Version(v1, v2);
    }, (v0) -> {
        return v0.major();
    }, StreamCodecs.ofVarInt(), (v0) -> {
        return v0.minor();
    }, StreamCodecs.ofVarInt());
    public static final StreamCodec<CrdtRequest> CRDT_REQUEST_CODEC = createCrdtRequestStreamCodec();
    public static final StreamCodec<CrdtResponse> CRDT_RESPONSE_CODEC = createCrdtResponseStreamCodec();

    public static Promise<List<Path>> getWalFiles(Executor executor, Path path) {
        return Promise.ofBlocking(executor, () -> {
            Stream<Path> list = Files.list(path);
            try {
                List list2 = (List) list.filter(path2 -> {
                    return Files.isRegularFile(path2, new LinkOption[0]) && path2.toString().endsWith(FileWriteAheadLog.EXT_FINAL);
                }).collect(Collectors.toList());
                if (list != null) {
                    list.close();
                }
                return list2;
            } catch (Throwable th) {
                if (list != null) {
                    try {
                        list.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        });
    }

    public static Promise<Void> deleteWalFiles(Executor executor, Collection<Path> collection) {
        return Promise.ofBlocking(executor, () -> {
            Iterator it = collection.iterator();
            while (it.hasNext()) {
                Files.deleteIfExists((Path) it.next());
            }
        });
    }

    public static <T> StreamTransformer<T, T> ackTransformer(UnaryOperator<Promise<Void>> unaryOperator) {
        return new StreamAckTransformer(unaryOperator);
    }

    public static <T> StreamTransformer<T, T> onItem(final Runnable runnable) {
        return new AbstractStreamTransformer<T, T>() { // from class: io.activej.crdt.util.Utils.1
            protected StreamDataAcceptor<T> onResumed(StreamDataAcceptor<T> streamDataAcceptor) {
                Runnable runnable2 = runnable;
                return obj -> {
                    runnable2.run();
                    streamDataAcceptor.accept(obj);
                };
            }
        };
    }

    private static StreamCodec<CrdtRequest> createCrdtRequestStreamCodec() {
        return (StreamCodec) StreamCodecs.SubtypeStreamCodec.builder().withSubtype(CrdtRequest.Download.class, StreamCodec.create((v1) -> {
            return new CrdtRequest.Download(v1);
        }, (v0) -> {
            return v0.token();
        }, StreamCodecs.ofVarLong())).withSubtype(CrdtRequest.Handshake.class, StreamCodec.create(CrdtRequest.Handshake::new, (v0) -> {
            return v0.version();
        }, VERSION_CODEC)).withSubtype(CrdtRequest.Ping.class, StreamCodecs.singleton(new CrdtRequest.Ping())).withSubtype(CrdtRequest.Take.class, StreamCodecs.singleton(new CrdtRequest.Take())).withSubtype(CrdtRequest.TakeAck.class, StreamCodecs.singleton(new CrdtRequest.TakeAck())).withSubtype(CrdtRequest.Upload.class, StreamCodecs.singleton(new CrdtRequest.Upload())).build();
    }

    private static StreamCodec<CrdtResponse> createCrdtResponseStreamCodec() {
        return (StreamCodec) StreamCodecs.SubtypeStreamCodec.builder().withSubtype(CrdtResponse.DownloadStarted.class, StreamCodecs.singleton(new CrdtResponse.DownloadStarted())).withSubtype(CrdtResponse.Handshake.class, StreamCodec.create(CrdtResponse.Handshake::new, (v0) -> {
            return v0.handshakeFailure();
        }, StreamCodecs.ofNullable(StreamCodec.create(CrdtResponse.HandshakeFailure::new, (v0) -> {
            return v0.minimalVersion();
        }, VERSION_CODEC, (v0) -> {
            return v0.message();
        }, StreamCodecs.ofString())))).withSubtype(CrdtResponse.Pong.class, StreamCodecs.singleton(new CrdtResponse.Pong())).withSubtype(CrdtResponse.RemoveAck.class, StreamCodecs.singleton(new CrdtResponse.RemoveAck())).withSubtype(CrdtResponse.ServerError.class, StreamCodec.create(CrdtResponse.ServerError::new, (v0) -> {
            return v0.message();
        }, StreamCodecs.ofString())).withSubtype(CrdtResponse.TakeStarted.class, StreamCodecs.singleton(new CrdtResponse.TakeStarted())).withSubtype(CrdtResponse.UploadAck.class, StreamCodecs.singleton(new CrdtResponse.UploadAck())).build();
    }

    public static <T> ChannelDeserializer<T> createDeserializer(BinarySerializer<T> binarySerializer) {
        return (ChannelDeserializer) ChannelDeserializer.builder(binarySerializer).withExplicitEndOfStream().build();
    }

    public static <T> ChannelSerializer<T> createSerializer(BinarySerializer<T> binarySerializer) {
        return (ChannelSerializer) ChannelSerializer.builder(binarySerializer).withExplicitEndOfStream().build();
    }
}
