package io.activej.cube.etcd;

import io.activej.common.Utils;
import io.activej.cube.aggregation.AggregationChunk;
import io.activej.cube.ot.CubeDiff;
import io.activej.etcd.TxnOps;
import io.activej.etcd.codec.key.EtcdKeyCodec;
import io.activej.etcd.codec.key.EtcdKeyCodecs;
import io.activej.etcd.codec.kv.EtcdKVCodec;
import io.activej.etcd.codec.kv.EtcdKVCodecs;
import io.activej.etcd.codec.prefix.EtcdPrefixCodec;
import io.activej.etcd.codec.prefix.EtcdPrefixCodecs;
import io.activej.etcd.codec.prefix.Prefix;
import io.activej.etcd.codec.value.EtcdValueCodec;
import io.activej.etcd.codec.value.EtcdValueCodecs;
import io.activej.etcd.exception.MalformedEtcdDataException;
import io.activej.etl.LogDiff;
import io.activej.etl.LogPositionDiff;
import io.etcd.jetcd.ByteSequence;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;

/* loaded from: input_file:io/activej/cube/etcd/EtcdUtils.class */
public final class EtcdUtils {
    public static final ByteSequence POS = io.activej.etcd.EtcdUtils.byteSequenceFrom("pos.");
    public static final ByteSequence CHUNK = io.activej.etcd.EtcdUtils.byteSequenceFrom("chunk.");
    public static final ByteSequence TIMESTAMP = io.activej.etcd.EtcdUtils.byteSequenceFrom("timestamp");
    public static final ByteSequence CLEANUP_REVISION = io.activej.etcd.EtcdUtils.byteSequenceFrom("cleanup-revision");
    public static final EtcdPrefixCodec<String> AGGREGATION_ID_CODEC = EtcdPrefixCodecs.ofTerminatingString('.');
    public static final EtcdValueCodec<Long> REVISION_CODEC = EtcdValueCodecs.ofLongString();
    static final EtcdKeyCodec<Long> CHUNK_ID_CODEC = new EtcdKeyCodec<Long>() { // from class: io.activej.cube.etcd.EtcdUtils.1
        /* renamed from: decodeKey, reason: merged with bridge method [inline-methods] */
        public Long m44decodeKey(ByteSequence byteSequence) throws MalformedEtcdDataException {
            try {
                return Long.valueOf(Long.parseLong(byteSequence.toString()));
            } catch (NumberFormatException e) {
                throw new MalformedEtcdDataException(e.getMessage());
            }
        }

        public ByteSequence encodeKey(Long l) {
            return io.activej.etcd.EtcdUtils.byteSequenceFrom(l.toString());
        }
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void saveCubeLogDiff(ByteSequence byteSequence, ByteSequence byteSequence2, EtcdPrefixCodec<String> etcdPrefixCodec, Function<String, EtcdKVCodec<Long, AggregationChunk>> function, TxnOps txnOps, LogDiff<CubeDiff> logDiff) {
        savePositions(txnOps.child(byteSequence), logDiff.getPositions());
        Iterator it = logDiff.getDiffs().iterator();
        while (it.hasNext()) {
            saveCubeDiff(etcdPrefixCodec, function, txnOps.child(byteSequence2), (CubeDiff) it.next());
        }
    }

    private static void savePositions(TxnOps txnOps, Map<String, LogPositionDiff> map) {
        io.activej.etcd.EtcdUtils.checkAndInsert(txnOps, EtcdKVCodecs.ofMapEntry(EtcdKeyCodecs.ofString(), CubeEtcdOTUplink.logPositionEtcdCodec()), (Map) map.entrySet().stream().filter(entry -> {
            return ((LogPositionDiff) entry.getValue()).from().isInitial();
        }).collect(Utils.entriesToLinkedHashMap((v0) -> {
            return v0.to();
        })));
        io.activej.etcd.EtcdUtils.checkAndUpdate(txnOps, EtcdKVCodecs.ofMapEntry(EtcdKeyCodecs.ofString(), CubeEtcdOTUplink.logPositionEtcdCodec()), (Map) map.entrySet().stream().filter(entry2 -> {
            return !((LogPositionDiff) entry2.getValue()).from().isInitial();
        }).collect(Utils.entriesToLinkedHashMap((v0) -> {
            return v0.from();
        })), (Map) map.entrySet().stream().filter(entry3 -> {
            return !((LogPositionDiff) entry3.getValue()).from().isInitial();
        }).collect(Utils.entriesToLinkedHashMap((v0) -> {
            return v0.to();
        })));
    }

    private static void saveCubeDiff(EtcdPrefixCodec<String> etcdPrefixCodec, Function<String, EtcdKVCodec<Long, AggregationChunk>> function, TxnOps txnOps, CubeDiff cubeDiff) {
        for (Map.Entry entry : ((LinkedHashMap) cubeDiff.getDiffs().entrySet().stream().collect(Utils.entriesToLinkedHashMap((v0) -> {
            return v0.getRemovedChunks();
        }))).entrySet()) {
            String str = (String) entry.getKey();
            io.activej.etcd.EtcdUtils.checkAndDelete(txnOps.child(etcdPrefixCodec.encodePrefix(new Prefix(str, ByteSequence.EMPTY))), function.apply(str), ((Set) entry.getValue()).stream().map((v0) -> {
                return v0.getChunkId();
            }).toList());
        }
        for (Map.Entry entry2 : ((LinkedHashMap) cubeDiff.getDiffs().entrySet().stream().collect(Utils.entriesToLinkedHashMap((v0) -> {
            return v0.getAddedChunks();
        }))).entrySet()) {
            String str2 = (String) entry2.getKey();
            io.activej.etcd.EtcdUtils.checkAndInsert(txnOps.child(etcdPrefixCodec.encodePrefix(new Prefix(str2, ByteSequence.EMPTY))), function.apply(str2), (Collection) entry2.getValue());
        }
    }
}
