package io.activej.ot.uplink;

import io.activej.async.function.AsyncPredicate;
import io.activej.async.util.LogUtils;
import io.activej.common.Utils;
import io.activej.common.exception.FatalErrorHandler;
import io.activej.common.function.FunctionEx;
import io.activej.common.ref.Ref;
import io.activej.ot.AsyncOTCommitFactory;
import io.activej.ot.OTAlgorithms;
import io.activej.ot.OTCommit;
import io.activej.ot.PollSanitizer;
import io.activej.ot.reducers.DiffsReducer;
import io.activej.ot.repository.AsyncOTRepository;
import io.activej.ot.system.OTSystem;
import io.activej.ot.uplink.AsyncOTUplink;
import io.activej.promise.Promise;
import io.activej.promise.PromisePredicates;
import io.activej.promise.Promises;
import io.activej.reactor.AbstractReactive;
import io.activej.reactor.Reactive;
import io.activej.reactor.Reactor;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/activej/ot/uplink/OTUplink.class */
public final class OTUplink<K, D, PC> extends AbstractReactive implements AsyncOTUplink<K, D, PC> {
    private static final Logger logger = LoggerFactory.getLogger(OTUplink.class);
    private final OTSystem<D> otSystem;
    private final AsyncOTRepository<K, D> repository;
    private final FunctionEx<OTCommit<K, D>, PC> protoCommitEncoder;
    private final FunctionEx<PC, OTCommit<K, D>> protoCommitDecoder;

    private OTUplink(Reactor reactor, AsyncOTRepository<K, D> asyncOTRepository, OTSystem<D> oTSystem, FunctionEx<OTCommit<K, D>, PC> functionEx, FunctionEx<PC, OTCommit<K, D>> functionEx2) {
        super(reactor);
        this.otSystem = oTSystem;
        this.repository = asyncOTRepository;
        this.protoCommitEncoder = functionEx;
        this.protoCommitDecoder = functionEx2;
    }

    public static <K, D, C> OTUplink<K, D, C> create(Reactor reactor, AsyncOTRepository<K, D> asyncOTRepository, OTSystem<D> oTSystem, FunctionEx<OTCommit<K, D>, C> functionEx, FunctionEx<C, OTCommit<K, D>> functionEx2) {
        return new OTUplink<>(reactor, asyncOTRepository, oTSystem, functionEx, functionEx2);
    }

    public static <K, D> OTUplink<K, D, OTCommit<K, D>> create(Reactor reactor, AsyncOTRepository<K, D> asyncOTRepository, OTSystem<D> oTSystem) {
        return new OTUplink<>(reactor, asyncOTRepository, oTSystem, oTCommit -> {
            return oTCommit;
        }, oTCommit2 -> {
            return oTCommit2;
        });
    }

    public AsyncOTRepository<K, D> getRepository() {
        return this.repository;
    }

    @Override // io.activej.ot.uplink.AsyncOTUplink
    public Promise<PC> createProtoCommit(K k, List<D> list, long j) {
        Reactive.checkInReactorThread(this);
        return this.repository.createCommit(k, new AsyncOTCommitFactory.DiffsWithLevel<>(j, list)).map(this.protoCommitEncoder).whenComplete(LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[]{k, list, Long.valueOf(j)}));
    }

    @Override // io.activej.ot.uplink.AsyncOTUplink
    public Promise<AsyncOTUplink.FetchData<K, D>> push(PC pc) {
        Reactive.checkInReactorThread(this);
        try {
            OTCommit<K, D> oTCommit = (OTCommit) this.protoCommitDecoder.apply(pc);
            Promise<Void> push = this.repository.push(oTCommit);
            AsyncOTRepository<K, D> asyncOTRepository = this.repository;
            Objects.requireNonNull(asyncOTRepository);
            return push.then(asyncOTRepository::getHeads).then(set -> {
                return OTAlgorithms.excludeParents(this.repository, this.otSystem, Utils.union(set, Set.of(oTCommit.getId()))).then(set -> {
                    return OTAlgorithms.mergeAndPush(this.repository, this.otSystem, set);
                }).then(obj -> {
                    Set<K> of = Set.of(obj);
                    return this.repository.updateHeads(of, Utils.difference(set, of)).then(() -> {
                        return doFetch(of, oTCommit.getId());
                    });
                });
            }).whenComplete(LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[]{pc}));
        } catch (Exception e) {
            FatalErrorHandler.handleError(e, this);
            return Promise.ofException(e);
        }
    }

    @Override // io.activej.ot.uplink.AsyncOTUplink
    public Promise<AsyncOTUplink.FetchData<K, D>> checkout() {
        Reactive.checkInReactorThread(this);
        Ref ref = new Ref();
        return this.repository.getHeads().then(set -> {
            return OTAlgorithms.findParent(this.repository, this.otSystem, set, DiffsReducer.toList(), oTCommit -> {
                return this.repository.loadSnapshot(oTCommit.getId()).map(optional -> {
                    Object orElse = optional.orElse(null);
                    ref.value = orElse;
                    return Boolean.valueOf(orElse != null);
                });
            });
        }).then(findResult -> {
            return Promise.of(new AsyncOTUplink.FetchData(findResult.getChild(), findResult.getChildLevel().longValue(), Utils.concat((List) ref.value, (List) findResult.getAccumulatedDiffs())));
        }).then(fetchData -> {
            return fetch(fetchData.commitId()).map(fetchData -> {
                return new AsyncOTUplink.FetchData(fetchData.commitId(), fetchData.level(), this.otSystem.squash(Utils.concat(fetchData.diffs(), fetchData.diffs())));
            });
        }).whenComplete(LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[0]));
    }

    @Override // io.activej.ot.uplink.AsyncOTUplink
    public Promise<AsyncOTUplink.FetchData<K, D>> fetch(K k) {
        Reactive.checkInReactorThread(this);
        return this.repository.getHeads().then(set -> {
            return doFetch(set, k);
        }).whenComplete(LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[]{k}));
    }

    @Override // io.activej.ot.uplink.AsyncOTUplink
    public Promise<AsyncOTUplink.FetchData<K, D>> poll(K k) {
        Reactive.checkInReactorThread(this);
        return Promises.retry(PromisePredicates.isResultOrException(set -> {
            return !set.contains(k);
        }), PollSanitizer.create(this.repository.pollHeads())).then(set2 -> {
            return doFetch(set2, k);
        });
    }

    private Promise<AsyncOTUplink.FetchData<K, D>> doFetch(Set<K> set, K k) {
        return OTAlgorithms.findParent(this.repository, this.otSystem, set, DiffsReducer.toSquashedList(this.otSystem), AsyncPredicate.of(oTCommit -> {
            return oTCommit.getId().equals(k);
        })).map(findResult -> {
            return new AsyncOTUplink.FetchData(findResult.getChild(), findResult.getChildLevel().longValue(), this.otSystem.squash((List) findResult.getAccumulatedDiffs()));
        }).whenComplete(LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[]{k}));
    }
}
