package io.activej.cube.etcd;

import io.activej.common.Checks;
import io.activej.common.Utils;
import io.activej.common.builder.AbstractBuilder;
import io.activej.common.exception.MalformedDataException;
import io.activej.common.tuple.Tuple2;
import io.activej.cube.CubeStructure;
import io.activej.cube.aggregation.AggregationChunk;
import io.activej.cube.aggregation.json.JsonCodecs;
import io.activej.cube.aggregation.ot.AggregationDiff;
import io.activej.cube.exception.CubeException;
import io.activej.cube.ot.CubeDiff;
import io.activej.etcd.EtcdEventProcessor;
import io.activej.etcd.EtcdListener;
import io.activej.etcd.EtcdUtils;
import io.activej.etcd.codec.key.EtcdKeyCodecs;
import io.activej.etcd.codec.kv.EtcdKVCodec;
import io.activej.etcd.codec.kv.EtcdKVCodecs;
import io.activej.etcd.codec.prefix.EtcdPrefixCodec;
import io.activej.etcd.codec.value.EtcdValueCodec;
import io.activej.etcd.exception.MalformedEtcdDataException;
import io.activej.etl.LogDiff;
import io.activej.etl.LogPositionDiff;
import io.activej.json.JsonUtils;
import io.activej.multilog.LogPosition;
import io.activej.ot.uplink.AsyncOTUplink;
import io.activej.promise.Promise;
import io.activej.promise.SettablePromise;
import io.activej.reactor.AbstractReactive;
import io.activej.reactor.Reactive;
import io.activej.reactor.Reactor;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KV;
import io.etcd.jetcd.KeyValue;
import io.etcd.jetcd.Watch;
import io.etcd.jetcd.options.DeleteOption;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.jetbrains.annotations.VisibleForTesting;

/* loaded from: input_file:io/activej/cube/etcd/CubeEtcdOTUplink.class */
public final class CubeEtcdOTUplink extends AbstractReactive implements AsyncOTUplink<Long, LogDiff<CubeDiff>, UplinkProtoCommit> {
    private final Client client;
    private final CubeStructure cubeStructure;
    private final ByteSequence root;
    private EtcdPrefixCodec<String> aggregationIdCodec;
    private Function<String, EtcdKVCodec<Long, AggregationChunk>> chunkCodecsFactory;
    private ByteSequence prefixPos;
    private ByteSequence prefixChunk;
    private ByteSequence timestampKey;

    /* loaded from: input_file:io/activej/cube/etcd/CubeEtcdOTUplink$Builder.class */
    public final class Builder extends AbstractBuilder<Builder, CubeEtcdOTUplink> {
        private Builder() {
        }

        public Builder withChunkCodecsFactory(Function<String, EtcdKVCodec<Long, AggregationChunk>> function) {
            checkNotBuilt(this);
            CubeEtcdOTUplink.this.chunkCodecsFactory = function;
            return this;
        }

        public Builder withPrefixPos(ByteSequence byteSequence) {
            checkNotBuilt(this);
            CubeEtcdOTUplink.this.prefixPos = byteSequence;
            return this;
        }

        public Builder withPrefixChunk(ByteSequence byteSequence) {
            checkNotBuilt(this);
            CubeEtcdOTUplink.this.prefixChunk = byteSequence;
            return this;
        }

        public Builder withTimestampKey(ByteSequence byteSequence) {
            checkNotBuilt(this);
            CubeEtcdOTUplink.this.timestampKey = byteSequence;
            return this;
        }

        public Builder withAggregationIdCodec(EtcdPrefixCodec<String> etcdPrefixCodec) {
            checkNotBuilt(this);
            CubeEtcdOTUplink.this.aggregationIdCodec = etcdPrefixCodec;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: doBuild, reason: merged with bridge method [inline-methods] */
        public CubeEtcdOTUplink m35doBuild() {
            if (CubeEtcdOTUplink.this.chunkCodecsFactory == null) {
                Map map = (Map) CubeEtcdOTUplink.this.cubeStructure.getAggregationStructures().entrySet().stream().collect(Utils.entriesToLinkedHashMap(aggregationStructure -> {
                    return new AggregationChunkJsonEtcdKVCodec(JsonCodecs.ofPrimaryKey(aggregationStructure));
                }));
                CubeEtcdOTUplink cubeEtcdOTUplink = CubeEtcdOTUplink.this;
                Objects.requireNonNull(map);
                cubeEtcdOTUplink.chunkCodecsFactory = (v1) -> {
                    return r1.get(v1);
                };
            }
            return CubeEtcdOTUplink.this;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/activej/cube/etcd/CubeEtcdOTUplink$CubeCheckoutResponse.class */
    public static final class CubeCheckoutResponse extends Record {
        private final long revision;
        private final Map<String, LogPosition> positions;
        private final Map<String, Set<AggregationChunk>> chunks;

        CubeCheckoutResponse(long j, Map<String, LogPosition> map, Map<String, Set<AggregationChunk>> map2) {
            this.revision = j;
            this.positions = map;
            this.chunks = map2;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, CubeCheckoutResponse.class), CubeCheckoutResponse.class, "revision;positions;chunks", "FIELD:Lio/activej/cube/etcd/CubeEtcdOTUplink$CubeCheckoutResponse;->revision:J", "FIELD:Lio/activej/cube/etcd/CubeEtcdOTUplink$CubeCheckoutResponse;->positions:Ljava/util/Map;", "FIELD:Lio/activej/cube/etcd/CubeEtcdOTUplink$CubeCheckoutResponse;->chunks:Ljava/util/Map;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, CubeCheckoutResponse.class), CubeCheckoutResponse.class, "revision;positions;chunks", "FIELD:Lio/activej/cube/etcd/CubeEtcdOTUplink$CubeCheckoutResponse;->revision:J", "FIELD:Lio/activej/cube/etcd/CubeEtcdOTUplink$CubeCheckoutResponse;->positions:Ljava/util/Map;", "FIELD:Lio/activej/cube/etcd/CubeEtcdOTUplink$CubeCheckoutResponse;->chunks:Ljava/util/Map;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, CubeCheckoutResponse.class, Object.class), CubeCheckoutResponse.class, "revision;positions;chunks", "FIELD:Lio/activej/cube/etcd/CubeEtcdOTUplink$CubeCheckoutResponse;->revision:J", "FIELD:Lio/activej/cube/etcd/CubeEtcdOTUplink$CubeCheckoutResponse;->positions:Ljava/util/Map;", "FIELD:Lio/activej/cube/etcd/CubeEtcdOTUplink$CubeCheckoutResponse;->chunks:Ljava/util/Map;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public long revision() {
            return this.revision;
        }

        public Map<String, LogPosition> positions() {
            return this.positions;
        }

        public Map<String, Set<AggregationChunk>> chunks() {
            return this.chunks;
        }
    }

    /* loaded from: input_file:io/activej/cube/etcd/CubeEtcdOTUplink$UplinkProtoCommit.class */
    public static final class UplinkProtoCommit extends Record {
        private final long parentRevision;
        private final List<LogDiff<CubeDiff>> diffs;

        public UplinkProtoCommit(long j, List<LogDiff<CubeDiff>> list) {
            this.parentRevision = j;
            this.diffs = list;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, UplinkProtoCommit.class), UplinkProtoCommit.class, "parentRevision;diffs", "FIELD:Lio/activej/cube/etcd/CubeEtcdOTUplink$UplinkProtoCommit;->parentRevision:J", "FIELD:Lio/activej/cube/etcd/CubeEtcdOTUplink$UplinkProtoCommit;->diffs:Ljava/util/List;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, UplinkProtoCommit.class), UplinkProtoCommit.class, "parentRevision;diffs", "FIELD:Lio/activej/cube/etcd/CubeEtcdOTUplink$UplinkProtoCommit;->parentRevision:J", "FIELD:Lio/activej/cube/etcd/CubeEtcdOTUplink$UplinkProtoCommit;->diffs:Ljava/util/List;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, UplinkProtoCommit.class, Object.class), UplinkProtoCommit.class, "parentRevision;diffs", "FIELD:Lio/activej/cube/etcd/CubeEtcdOTUplink$UplinkProtoCommit;->parentRevision:J", "FIELD:Lio/activej/cube/etcd/CubeEtcdOTUplink$UplinkProtoCommit;->diffs:Ljava/util/List;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public long parentRevision() {
            return this.parentRevision;
        }

        public List<LogDiff<CubeDiff>> diffs() {
            return this.diffs;
        }
    }

    private CubeEtcdOTUplink(Reactor reactor, Client client, CubeStructure cubeStructure, ByteSequence byteSequence) {
        super(reactor);
        this.aggregationIdCodec = EtcdUtils.AGGREGATION_ID_CODEC;
        this.prefixPos = EtcdUtils.POS;
        this.prefixChunk = EtcdUtils.CHUNK;
        this.timestampKey = EtcdUtils.TIMESTAMP;
        this.client = client;
        this.cubeStructure = cubeStructure;
        this.root = byteSequence;
    }

    public static Builder builder(Reactor reactor, CubeStructure cubeStructure, Client client, ByteSequence byteSequence) {
        return new Builder();
    }

    public Promise<AsyncOTUplink.FetchData<Long, LogDiff<CubeDiff>>> checkout() {
        Reactive.checkInReactorThread(this);
        return Promise.ofCompletionStage(doCheckout().thenApply(cubeCheckoutResponse -> {
            Map map = (Map) cubeCheckoutResponse.positions.entrySet().stream().collect(Utils.entriesToLinkedHashMap(logPosition -> {
                return new LogPositionDiff((LogPosition) null, logPosition);
            }));
            CubeDiff of = CubeDiff.of((Map) cubeCheckoutResponse.chunks.entrySet().stream().collect(Utils.entriesToLinkedHashMap(AggregationDiff::of)));
            return new AsyncOTUplink.FetchData(Long.valueOf(cubeCheckoutResponse.revision), cubeCheckoutResponse.revision, (map.isEmpty() && of.getDiffs().isEmpty()) ? List.of() : List.of(LogDiff.of(map, of)));
        }));
    }

    private CompletableFuture<CubeCheckoutResponse> doCheckout() {
        return io.activej.etcd.EtcdUtils.checkout(this.client.getKVClient(), 0L, new EtcdUtils.CheckoutRequest[]{EtcdUtils.CheckoutRequest.ofMapEntry(this.root.concat(this.prefixPos), EtcdKVCodecs.ofMapEntry(EtcdKeyCodecs.ofString(), logPositionEtcdCodec()), Utils.entriesToLinkedHashMap()), EtcdUtils.CheckoutRequest.of(this.root.concat(this.prefixChunk), EtcdKVCodecs.ofPrefixedEntry(this.aggregationIdCodec, this.chunkCodecsFactory), Collectors.groupingBy((v0) -> {
            return v0.value1();
        }, Collectors.mapping((v0) -> {
            return v0.value2();
        }, Collectors.toSet())))}, (header, objArr) -> {
            Map map = (Map) objArr[0];
            Map map2 = (Map) objArr[1];
            for (Map.Entry entry : map2.entrySet()) {
                Iterator it = ((Set) entry.getValue()).iterator();
                while (it.hasNext()) {
                    try {
                        this.cubeStructure.validateMeasures((String) entry.getKey(), ((AggregationChunk) it.next()).getMeasures());
                    } catch (MalformedDataException e) {
                        throw new MalformedEtcdDataException(e.getMessage());
                    }
                }
            }
            return new CubeCheckoutResponse(header.getRevision(), map, map2);
        });
    }

    public Promise<AsyncOTUplink.FetchData<Long, LogDiff<CubeDiff>>> fetch(Long l) {
        Reactive.checkInReactorThread(this);
        return Promise.ofCompletionStage(this.client.getKVClient().get(this.root).exceptionallyCompose(th -> {
            return CompletableFuture.failedFuture(new CubeException("Failed to fetch diffs", io.activej.etcd.EtcdUtils.convertStatusException(th.getCause())));
        })).then(getResponse -> {
            long revision = getResponse.getKvs().isEmpty() ? getResponse.getHeader().getRevision() : ((KeyValue) getResponse.getKvs().get(0)).getModRevision();
            return doFetch(l.longValue(), revision).map(list -> {
                return new AsyncOTUplink.FetchData(Long.valueOf(revision), revision, list);
            });
        });
    }

    private Promise<List<LogDiff<CubeDiff>>> doFetch(long j, final long j2) {
        Reactive.checkInReactorThread(this);
        Checks.checkArgument(j <= j2);
        if (j2 == j) {
            return Promise.of(Collections.emptyList());
        }
        final SettablePromise settablePromise = new SettablePromise();
        final AtomicReference atomicReference = new AtomicReference();
        this.reactor.startExternalTask();
        atomicReference.set(io.activej.etcd.EtcdUtils.watch(this.client.getWatchClient(), j + 1, new EtcdUtils.WatchRequest[]{EtcdUtils.WatchRequest.ofMapEntry(this.root.concat(this.prefixPos), EtcdKVCodecs.ofMapEntry(EtcdKeyCodecs.ofString(), logPositionEtcdCodec()), new EtcdEventProcessor<String, Map.Entry<String, LogPosition>, Map<String, LogPositionDiff>>() { // from class: io.activej.cube.etcd.CubeEtcdOTUplink.1
            /* renamed from: createEventsAccumulator, reason: merged with bridge method [inline-methods] */
            public Map<String, LogPositionDiff> m32createEventsAccumulator() {
                return new LinkedHashMap();
            }

            public void onPut(Map<String, LogPositionDiff> map, Map.Entry<String, LogPosition> entry) {
                map.put(entry.getKey(), new LogPositionDiff((LogPosition) null, entry.getValue()));
            }

            public void onDelete(Map<String, LogPositionDiff> map, String str) {
                throw new UnsupportedOperationException();
            }
        }), EtcdUtils.WatchRequest.of(this.root.concat(this.prefixChunk), EtcdKVCodecs.ofPrefixedEntry(this.aggregationIdCodec, this.chunkCodecsFactory), new EtcdEventProcessor<Tuple2<String, Long>, Tuple2<String, AggregationChunk>, Map<String, AggregationDiff>>() { // from class: io.activej.cube.etcd.CubeEtcdOTUplink.2
            /* renamed from: createEventsAccumulator, reason: merged with bridge method [inline-methods] */
            public Map<String, AggregationDiff> m33createEventsAccumulator() {
                return new LinkedHashMap();
            }

            public void onPut(Map<String, AggregationDiff> map, Tuple2<String, AggregationChunk> tuple2) {
                map.compute((String) tuple2.value1(), (str, aggregationDiff) -> {
                    return aggregationDiff == null ? AggregationDiff.of(Set.of((AggregationChunk) tuple2.value2()), Set.of()) : AggregationDiff.of(Utils.union(aggregationDiff.getAddedChunks(), Set.of((AggregationChunk) tuple2.value2())), aggregationDiff.getRemovedChunks());
                });
            }

            public void onDelete(Map<String, AggregationDiff> map, Tuple2<String, Long> tuple2) {
                map.compute((String) tuple2.value1(), (str, aggregationDiff) -> {
                    return aggregationDiff == null ? AggregationDiff.of(Set.of(), Set.of(AggregationChunk.ofId(((Long) tuple2.value2()).longValue()))) : AggregationDiff.of(aggregationDiff.getAddedChunks(), Utils.union(aggregationDiff.getRemovedChunks(), Set.of(AggregationChunk.ofId(((Long) tuple2.value2()).longValue()))));
                });
            }
        })}, new EtcdListener<Object[]>() { // from class: io.activej.cube.etcd.CubeEtcdOTUplink.3
            LogDiff<CubeDiff> logDiff = LogDiff.empty();

            public void onConnectionEstablished() {
            }

            public void onNext(long j3, Object[] objArr) throws MalformedEtcdDataException {
                Checks.checkArgument(j3 <= j2);
                Map map = (Map) objArr[0];
                Map map2 = (Map) objArr[1];
                for (Map.Entry entry : map2.entrySet()) {
                    Iterator<AggregationChunk> it = ((AggregationDiff) entry.getValue()).getAddedChunks().iterator();
                    while (it.hasNext()) {
                        try {
                            CubeEtcdOTUplink.this.cubeStructure.validateMeasures((String) entry.getKey(), it.next().getMeasures());
                        } catch (MalformedDataException e) {
                            throw new MalformedEtcdDataException(e.getMessage());
                        }
                    }
                }
                this.logDiff = LogDiff.reduce(List.of(this.logDiff, LogDiff.of(map, CubeDiff.of(map2))), CubeDiff::reduce);
                if (j3 == j2) {
                    Reactor reactor = CubeEtcdOTUplink.this.reactor;
                    SettablePromise settablePromise2 = settablePromise;
                    reactor.execute(() -> {
                        settablePromise2.trySet(List.of(this.logDiff));
                    });
                    ((Watch.Watcher) atomicReference.get()).close();
                }
            }

            public void onError(Throwable th) {
                Reactor reactor = CubeEtcdOTUplink.this.reactor;
                SettablePromise settablePromise2 = settablePromise;
                reactor.submit(() -> {
                    settablePromise2.trySetException((Exception) th);
                });
                ((Watch.Watcher) atomicReference.get()).close();
            }

            public void onCompleted() {
                CubeEtcdOTUplink.this.reactor.completeExternalTask();
            }
        }));
        return settablePromise;
    }

    public Promise<UplinkProtoCommit> createProtoCommit(Long l, List<LogDiff<CubeDiff>> list, long j) {
        Reactive.checkInReactorThread(this);
        return Promise.of(new UplinkProtoCommit(l.longValue(), list));
    }

    public Promise<AsyncOTUplink.FetchData<Long, LogDiff<CubeDiff>>> push(UplinkProtoCommit uplinkProtoCommit) {
        Reactive.checkInReactorThread(this);
        return Promise.ofCompletionStage(io.activej.etcd.EtcdUtils.executeTxnOps(this.client.getKVClient(), this.root, txnOps -> {
            io.activej.etcd.EtcdUtils.touchTimestamp(txnOps, this.timestampKey, this.reactor);
            Iterator<LogDiff<CubeDiff>> it = uplinkProtoCommit.diffs.iterator();
            while (it.hasNext()) {
                EtcdUtils.saveCubeLogDiff(this.prefixPos, this.prefixChunk, this.aggregationIdCodec, this.chunkCodecsFactory, txnOps, it.next());
            }
        })).then(txnResponse -> {
            return doFetch(uplinkProtoCommit.parentRevision(), txnResponse.getHeader().getRevision() - 1).map(list -> {
                return new AsyncOTUplink.FetchData(Long.valueOf(txnResponse.getHeader().getRevision()), txnResponse.getHeader().getRevision(), list);
            });
        });
    }

    public static EtcdValueCodec<LogPosition> logPositionEtcdCodec() {
        return new EtcdValueCodec<LogPosition>() { // from class: io.activej.cube.etcd.CubeEtcdOTUplink.4
            public ByteSequence encodeValue(LogPosition logPosition) {
                return io.activej.etcd.EtcdUtils.byteSequenceFrom(JsonUtils.toJson(io.activej.etl.json.JsonCodecs.ofLogPosition(), logPosition));
            }

            /* renamed from: decodeValue, reason: merged with bridge method [inline-methods] */
            public LogPosition m34decodeValue(ByteSequence byteSequence) throws MalformedEtcdDataException {
                try {
                    return (LogPosition) JsonUtils.fromJson(io.activej.etl.json.JsonCodecs.ofLogPosition(), byteSequence.toString());
                } catch (MalformedDataException e) {
                    throw new MalformedEtcdDataException("Failed to decode log position of value '" + byteSequence + "'", e);
                }
            }
        };
    }

    @VisibleForTesting
    public void delete() throws ExecutionException, InterruptedException {
        KV kVClient = this.client.getKVClient();
        kVClient.delete(this.root, DeleteOption.builder().isPrefix(true).build()).get();
        kVClient.put(this.root.concat(this.timestampKey), io.activej.etcd.EtcdUtils.TOUCH_TIMESTAMP_CODEC.encodeValue(Long.valueOf(this.reactor.currentTimeMillis()))).get();
    }

    public /* bridge */ /* synthetic */ Promise createProtoCommit(Object obj, List list, long j) {
        return createProtoCommit((Long) obj, (List<LogDiff<CubeDiff>>) list, j);
    }
}
