package io.activej.ot.uplink;

import io.activej.common.Utils;
import io.activej.ot.TransformResult;
import io.activej.ot.system.OTSystem;
import io.activej.ot.uplink.AsyncOTUplink;
import io.activej.promise.Promise;
import io.activej.promise.PromisePredicates;
import io.activej.promise.Promises;
import io.activej.promise.SettableCallback;
import io.activej.reactor.AbstractReactive;
import io.activej.reactor.Reactive;
import io.activej.reactor.Reactor;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:io/activej/ot/uplink/StorageOTUplink.class */
public final class StorageOTUplink<K, D> extends AbstractReactive implements AsyncOTUplink<Long, D, ProtoCommit<D>> {
    public static final long FIRST_COMMIT_ID = 1;
    public static final int NO_LEVEL = 0;
    private final AsyncStorage<K, D> storage;
    private final OTSystem<D> otSystem;
    private final AsyncOTUplink<K, D, Object> uplink;

    /* loaded from: input_file:io/activej/ot/uplink/StorageOTUplink$AsyncStorage.class */
    public interface AsyncStorage<K, D> {

        /* loaded from: input_file:io/activej/ot/uplink/StorageOTUplink$AsyncStorage$SyncData.class */
        public static final class SyncData<K, D> {
            private final long commitId;
            private final K uplinkCommitId;
            private final long uplinkLevel;
            private final List<D> uplinkDiffs;

            @Nullable
            private final Object protoCommit;

            public SyncData(long j, K k, long j2, List<D> list, @Nullable Object obj) {
                this.commitId = j;
                this.uplinkCommitId = k;
                this.uplinkLevel = j2;
                this.uplinkDiffs = list;
                this.protoCommit = obj;
            }

            public long getCommitId() {
                return this.commitId;
            }

            public K getUplinkCommitId() {
                return this.uplinkCommitId;
            }

            public long getUplinkLevel() {
                return this.uplinkLevel;
            }

            public List<D> getUplinkDiffs() {
                return this.uplinkDiffs;
            }

            @Nullable
            public Object getProtoCommit() {
                return this.protoCommit;
            }

            public boolean isSyncing() {
                return this.protoCommit != null;
            }
        }

        Promise<Boolean> init(long j, List<D> list, K k, long j2);

        Promise<AsyncOTUplink.FetchData<Long, D>> getSnapshot();

        Promise<Long> getHead();

        default Promise<AsyncOTUplink.FetchData<Long, D>> fetch(long j) {
            ArrayList arrayList = new ArrayList();
            return getHead().then(l -> {
                return Promises.loop(Long.valueOf(j + 1), l -> {
                    return l.longValue() <= l.longValue();
                }, l2 -> {
                    return getCommit(l2.longValue()).map(protoCommit -> {
                        arrayList.addAll(protoCommit.getDiffs());
                        return Long.valueOf(l2.longValue() + 1);
                    });
                }).map(l3 -> {
                    return new AsyncOTUplink.FetchData(l, 0L, arrayList);
                });
            });
        }

        default Promise<AsyncOTUplink.FetchData<Long, D>> poll(long j) {
            return fetch(j);
        }

        Promise<ProtoCommit<D>> getCommit(long j);

        Promise<Boolean> add(long j, List<D> list);

        Promise<SyncData<K, D>> getSyncData();

        default Promise<Boolean> isSyncing() {
            return getSyncData().map((v0) -> {
                return v0.isSyncing();
            });
        }

        Promise<Void> startSync(long j, K k, Object obj);

        Promise<Boolean> completeSync(long j, List<D> list, K k, long j2, List<D> list2);
    }

    /* loaded from: input_file:io/activej/ot/uplink/StorageOTUplink$ProtoCommit.class */
    public static final class ProtoCommit<D> {
        private final long id;
        private final List<D> diffs;

        private ProtoCommit(long j, List<D> list) {
            this.id = j;
            this.diffs = list;
        }

        public long getId() {
            return this.id;
        }

        public List<D> getDiffs() {
            return this.diffs;
        }
    }

    private StorageOTUplink(Reactor reactor, AsyncStorage<K, D> asyncStorage, OTSystem<D> oTSystem, AsyncOTUplink<K, D, ?> asyncOTUplink) {
        super(reactor);
        this.otSystem = oTSystem;
        this.storage = asyncStorage;
        this.uplink = asyncOTUplink;
    }

    public Promise<Void> sync() {
        return startSync().then(syncData -> {
            return this.uplink.push(syncData.getProtoCommit()).then(fetchData -> {
                return Promise.ofCallback(settableCallback -> {
                    completeSync(syncData.getCommitId(), new ArrayList(), fetchData.commitId(), fetchData.level(), fetchData.diffs(), settableCallback);
                });
            });
        });
    }

    Promise<AsyncStorage.SyncData<K, D>> startSync() {
        return this.storage.getSyncData().then(syncData -> {
            return syncData.getProtoCommit() == null ? this.storage.fetch(syncData.getCommitId()).then(fetchData -> {
                long longValue = ((Long) fetchData.commitId()).longValue();
                List<D> diffs = fetchData.diffs();
                return this.uplink.createProtoCommit(syncData.getUplinkCommitId(), Utils.concat(syncData.getUplinkDiffs(), diffs), 0L).then(obj -> {
                    return this.storage.startSync(longValue, syncData.getUplinkCommitId(), obj).map(r15 -> {
                        return new AsyncStorage.SyncData(longValue, syncData.getUplinkCommitId(), syncData.getUplinkLevel(), diffs, obj);
                    });
                });
            }) : Promise.of(syncData);
        });
    }

    void completeSync(long j, List<D> list, K k, long j2, List<D> list2, SettableCallback<Void> settableCallback) {
        Promise whenResult = this.storage.fetch(j).whenResult(fetchData -> {
            TransformResult<D> transform = this.otSystem.transform(list2, (List) fetchData.diffs());
            list.addAll(transform.left);
            Promise whenResult2 = this.storage.completeSync(((Long) fetchData.commitId()).longValue(), list, k, j2, transform.right).whenResult(bool -> {
                if (bool.booleanValue()) {
                    settableCallback.set((Object) null);
                } else {
                    completeSync(j, list, k, j2, transform.right, settableCallback);
                }
            });
            Objects.requireNonNull(settableCallback);
            whenResult2.whenException(settableCallback::setException);
        });
        Objects.requireNonNull(settableCallback);
        whenResult.whenException(settableCallback::setException);
    }

    @Override // io.activej.ot.uplink.AsyncOTUplink
    public Promise<AsyncOTUplink.FetchData<Long, D>> checkout() {
        Reactive.checkInReactorThread(this);
        return Promises.retry(PromisePredicates.isResultOrException(Objects::nonNull), () -> {
            return this.storage.getSnapshot().then(fetchData -> {
                return fetchData == null ? this.uplink.checkout().then(fetchData -> {
                    return this.storage.init(1L, fetchData.diffs(), fetchData.commitId(), fetchData.level()).map(bool -> {
                        if (bool.booleanValue()) {
                            return new AsyncOTUplink.FetchData(1L, 0L, fetchData.diffs());
                        }
                        return null;
                    });
                }) : Promise.of(fetchData);
            });
        }).then(fetchData -> {
            return this.storage.fetch(((Long) fetchData.commitId()).longValue()).map(fetchData -> {
                return new AsyncOTUplink.FetchData((Long) fetchData.commitId(), 0L, Utils.concat(fetchData.diffs(), fetchData.diffs()));
            });
        });
    }

    @Override // io.activej.ot.uplink.AsyncOTUplink
    public Promise<ProtoCommit<D>> createProtoCommit(Long l, List<D> list, long j) {
        Reactive.checkInReactorThread(this);
        return Promise.of(new ProtoCommit(l.longValue(), list));
    }

    @Override // io.activej.ot.uplink.AsyncOTUplink
    public Promise<AsyncOTUplink.FetchData<Long, D>> push(ProtoCommit<D> protoCommit) {
        Reactive.checkInReactorThread(this);
        return Promise.ofCallback(settableCallback -> {
            doPush(protoCommit.getId(), protoCommit.getDiffs(), List.of(), settableCallback);
        });
    }

    void doPush(long j, List<D> list, List<D> list2, SettableCallback<AsyncOTUplink.FetchData<Long, D>> settableCallback) {
        Promise whenResult = this.storage.add(j, list).whenResult(bool -> {
            if (bool.booleanValue()) {
                settableCallback.set(new AsyncOTUplink.FetchData(Long.valueOf(j + 1), 0L, list2));
                return;
            }
            Promise whenResult2 = this.storage.fetch(j).whenResult(fetchData -> {
                TransformResult<D> transform = this.otSystem.transform((List) fetchData.diffs(), list);
                doPush(((Long) fetchData.commitId()).longValue(), transform.left, Utils.concat(list2, transform.right), settableCallback);
            });
            Objects.requireNonNull(settableCallback);
            whenResult2.whenException(settableCallback::setException);
        });
        Objects.requireNonNull(settableCallback);
        whenResult.whenException(settableCallback::setException);
    }

    @Override // io.activej.ot.uplink.AsyncOTUplink
    public Promise<AsyncOTUplink.FetchData<Long, D>> fetch(Long l) {
        Reactive.checkInReactorThread(this);
        return this.storage.fetch(l.longValue());
    }

    @Override // io.activej.ot.uplink.AsyncOTUplink
    public Promise<AsyncOTUplink.FetchData<Long, D>> poll(Long l) {
        Reactive.checkInReactorThread(this);
        return this.storage.poll(l.longValue());
    }
}
