package io.activej.ot;

import io.activej.async.exception.AsyncCloseException;
import io.activej.async.function.AsyncRunnable;
import io.activej.async.function.AsyncRunnables;
import io.activej.async.function.AsyncSupplier;
import io.activej.async.function.AsyncSuppliers;
import io.activej.async.process.AbstractAsyncCloseable;
import io.activej.async.process.AsyncExecutors;
import io.activej.async.service.ReactiveService;
import io.activej.async.util.LogUtils;
import io.activej.common.ApplicationSettings;
import io.activej.common.Checks;
import io.activej.common.Utils;
import io.activej.common.builder.AbstractBuilder;
import io.activej.ot.OTState;
import io.activej.ot.StateManager;
import io.activej.ot.exception.TransformException;
import io.activej.ot.system.OTSystem;
import io.activej.ot.uplink.AsyncOTUplink;
import io.activej.promise.Promise;
import io.activej.promise.Promises;
import io.activej.promise.RetryPolicy;
import io.activej.promise.SettablePromise;
import io.activej.reactor.AbstractReactive;
import io.activej.reactor.Reactive;
import io.activej.reactor.Reactor;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/activej/ot/OTStateManager.class */
public final class OTStateManager<K, D, S extends OTState<D>> extends AbstractReactive implements ReactiveService, StateManager<D, S> {
    private static final Logger logger;
    public static final int STATE_SUBSCRIBER_BUFFER_SIZE;
    private final OTSystem<D> otSystem;
    private final AsyncOTUplink<K, D, Object> uplink;
    private final AsyncSupplier<Boolean> fetch;
    private final Set<OTStateManager<K, D, S>.StateChangesListener> listeners;
    private S state;

    @Nullable
    private K commitId;

    @Nullable
    private K originCommitId;
    private long level;
    private long originLevel;
    private List<D> workingDiffs;
    private List<D> originDiffs;

    @Nullable
    private Object pendingProtoCommit;

    @Nullable
    private List<D> pendingProtoCommitDiffs;
    private final AsyncRunnable sync;
    private boolean isSyncing;

    @Nullable
    private AsyncRunnable poll;
    private boolean isPolling;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:io/activej/ot/OTStateManager$Builder.class */
    public final class Builder extends AbstractBuilder<OTStateManager<K, D, S>.Builder, OTStateManager<K, D, S>> {
        private Builder() {
        }

        public OTStateManager<K, D, S>.Builder withPoll() {
            checkNotBuilt(this);
            return withPoll(UnaryOperator.identity());
        }

        public OTStateManager<K, D, S>.Builder withPoll(RetryPolicy<?> retryPolicy) {
            checkNotBuilt(this);
            return withPoll(asyncRunnable -> {
                return AsyncRunnables.ofExecutor(AsyncExecutors.retry(retryPolicy), asyncRunnable);
            });
        }

        public OTStateManager<K, D, S>.Builder withPoll(UnaryOperator<AsyncRunnable> unaryOperator) {
            checkNotBuilt(this);
            OTStateManager oTStateManager = OTStateManager.this;
            OTStateManager oTStateManager2 = OTStateManager.this;
            oTStateManager.poll = (AsyncRunnable) unaryOperator.apply(oTStateManager2::doPoll);
            return this;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: doBuild, reason: merged with bridge method [inline-methods] */
        public OTStateManager<K, D, S> m4doBuild() {
            return OTStateManager.this;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/activej/ot/OTStateManager$StateChangesListener.class */
    public final class StateChangesListener extends AbstractAsyncCloseable implements StateManager.StateChangesSupplier<D> {
        private final Queue<D> diffs = new ArrayDeque(OTStateManager.STATE_SUBSCRIBER_BUFFER_SIZE);
        private SettablePromise<D> pending;

        public StateChangesListener() {
            OTStateManager.this.listeners.add(this);
        }

        void onStateChange(D d) {
            if (isClosed()) {
                return;
            }
            if (this.pending != null) {
                SettablePromise<D> settablePromise = this.pending;
                this.pending = null;
                settablePromise.set(d);
            } else if (this.diffs.size() > OTStateManager.STATE_SUBSCRIBER_BUFFER_SIZE) {
                closeEx(new AsyncCloseException("State changes rate exceed supplier rate"));
            } else {
                this.diffs.add(d);
            }
        }

        public Promise<D> get() {
            if (isClosed()) {
                return Promise.ofException(getException());
            }
            D poll = this.diffs.poll();
            if (poll != null) {
                return Promise.of(poll);
            }
            Checks.checkState(this.pending == null, "Previous get() has not finished yet");
            this.pending = new SettablePromise<>();
            return this.pending;
        }

        protected void onClosed(Exception exc) {
            if (this.pending == null) {
                return;
            }
            this.pending.setException(exc);
            this.pending = null;
        }

        protected void onCleanup() {
            OTStateManager.this.listeners.remove(this);
        }
    }

    private OTStateManager(Reactor reactor, OTSystem<D> oTSystem, AsyncOTUplink<K, D, ?> asyncOTUplink, S s) {
        super(reactor);
        this.fetch = AsyncSuppliers.reuse(this::doFetch);
        this.listeners = new HashSet();
        this.workingDiffs = new ArrayList();
        this.originDiffs = List.of();
        this.sync = AsyncRunnables.reuse(this::doSync);
        this.otSystem = oTSystem;
        this.uplink = asyncOTUplink;
        this.state = s;
    }

    public static <K, D, S extends OTState<D>> OTStateManager<K, D, S> create(Reactor reactor, OTSystem<D> oTSystem, AsyncOTUplink<K, D, ?> asyncOTUplink, S s) {
        return (OTStateManager) builder(reactor, oTSystem, asyncOTUplink, s).build();
    }

    public static <K, D, S extends OTState<D>> OTStateManager<K, D, S>.Builder builder(Reactor reactor, OTSystem<D> oTSystem, AsyncOTUplink<K, D, ?> asyncOTUplink, S s) {
        return new Builder();
    }

    @Override // io.activej.ot.StateManager
    public Promise<Void> catchUp() {
        return this.sync.run();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.activej.ot.StateManager
    public Promise<Void> push(List<D> list) {
        addAll(list);
        return catchUp().whenException(exc -> {
            reset();
        });
    }

    @Override // io.activej.ot.StateManager
    public StateManager.StateChangesSupplier<D> subscribeToStateChanges() {
        return new StateChangesListener();
    }

    @Override // io.activej.ot.StateManager
    public <R> R query(Function<S, R> function) {
        return function.apply(this.state);
    }

    public Promise<?> start() {
        Reactive.checkInReactorThread(this);
        return checkout().whenResult(this::poll);
    }

    public Promise<?> stop() {
        Reactive.checkInReactorThread(this);
        this.poll = null;
        return isValid() ? catchUp().whenComplete(this::invalidateInternalState) : Promise.complete();
    }

    public Promise<Void> checkout() {
        Reactive.checkInReactorThread(this);
        Checks.checkState(this.commitId == null);
        return this.uplink.checkout().whenResult(fetchData -> {
            this.state.init();
            apply(fetchData.diffs());
            K k = (K) fetchData.commitId();
            this.originCommitId = k;
            this.commitId = k;
            long level = fetchData.level();
            this.originLevel = level;
            this.level = level;
        }).toVoid().whenComplete(LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[]{this}));
    }

    private boolean isSyncing() {
        return this.isSyncing;
    }

    private boolean isPolling() {
        return this.isPolling;
    }

    public Promise<Void> sync() {
        return this.sync.run();
    }

    public Promise<Boolean> fetch() {
        Reactive.checkInReactorThread(this);
        Checks.checkState(isValid());
        return this.fetch.get();
    }

    private void updateOrigin(AsyncOTUplink.FetchData<K, D> fetchData) {
        if (!$assertionsDisabled && this.pendingProtoCommit != null) {
            throw new AssertionError();
        }
        this.originCommitId = fetchData.commitId();
        this.originLevel = fetchData.level();
        this.originDiffs = this.otSystem.squash(Utils.concat(this.originDiffs, fetchData.diffs()));
    }

    private Promise<Void> doSync() {
        Checks.checkState(isValid());
        this.isSyncing = true;
        AsyncRunnable[] asyncRunnableArr = new AsyncRunnable[4];
        asyncRunnableArr[0] = this::push;
        asyncRunnableArr[1] = this.poll == null ? this::pull : Promise::complete;
        asyncRunnableArr[2] = this::commit;
        asyncRunnableArr[3] = this::push;
        return Promises.sequence(asyncRunnableArr).whenComplete(() -> {
            this.isSyncing = false;
        }).whenComplete(this::poll).whenComplete(LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[]{this}));
    }

    private void poll() {
        if (this.poll == null || isPolling() || this.pendingProtoCommit != null) {
            return;
        }
        this.isPolling = true;
        this.poll.run().async().whenComplete(() -> {
            this.isPolling = false;
        }).whenComplete(() -> {
            if (isSyncing()) {
                return;
            }
            poll();
        });
    }

    private Promise<Void> pull() {
        return fetch().whenResult(this::rebase).toVoid().whenComplete(LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[]{this}));
    }

    private Promise<Void> doPoll() {
        if (!isValid()) {
            return Promise.complete();
        }
        K k = this.originCommitId;
        return this.uplink.poll(k).whenResult(fetchData -> {
            if (isSyncing() || k != this.originCommitId) {
                return;
            }
            updateOrigin(fetchData);
            if (this.pendingProtoCommit == null) {
                rebase();
            }
        }).toVoid().whenComplete(LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[]{this}));
    }

    private Promise<Boolean> doFetch() {
        if (this.pendingProtoCommit != null) {
            return Promise.of(false);
        }
        K k = this.originCommitId;
        return this.uplink.fetch(k).map(fetchData -> {
            if (k != this.originCommitId || this.pendingProtoCommit != null) {
                return false;
            }
            if (fetchData.commitId() == k) {
                return false;
            }
            updateOrigin(fetchData);
            return true;
        }).whenComplete(LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[]{this}));
    }

    private void rebase() throws TransformException {
        if (!$assertionsDisabled && this.pendingProtoCommit != null) {
            throw new AssertionError();
        }
        if (this.commitId == this.originCommitId) {
            return;
        }
        logger.info("Rebasing - {} {}", this.commitId, this.originCommitId);
        try {
            TransformResult<D> transform = this.otSystem.transform((List) this.otSystem.squash(this.workingDiffs), (List) this.otSystem.squash(this.originDiffs));
            apply(transform.left);
            this.workingDiffs = new ArrayList(transform.right);
            this.commitId = this.originCommitId;
            this.level = this.originLevel;
            this.originDiffs = List.of();
        } catch (TransformException e) {
            invalidateInternalState();
            throw e;
        }
    }

    private Promise<Void> commit() {
        if (!$assertionsDisabled && this.pendingProtoCommit != null) {
            throw new AssertionError();
        }
        if (this.workingDiffs.isEmpty()) {
            return Promise.complete();
        }
        int size = this.workingDiffs.size();
        ArrayList arrayList = new ArrayList(this.otSystem.squash(this.workingDiffs));
        return this.uplink.createProtoCommit(this.commitId, arrayList, this.level).whenResult(obj -> {
            if (!$assertionsDisabled && this.pendingProtoCommit != null) {
                throw new AssertionError();
            }
            this.pendingProtoCommit = obj;
            this.pendingProtoCommitDiffs = arrayList;
            this.workingDiffs = new ArrayList(this.workingDiffs.subList(size, this.workingDiffs.size()));
            resetOrigin();
        }).toVoid().whenComplete(LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[]{this}));
    }

    private Promise<Void> push() {
        return this.pendingProtoCommit == null ? Promise.complete() : this.uplink.push(this.pendingProtoCommit).whenResult(fetchData -> {
            this.pendingProtoCommit = null;
            this.pendingProtoCommitDiffs = null;
            if (!$assertionsDisabled && this.commitId != this.originCommitId) {
                throw new AssertionError();
            }
            updateOrigin(fetchData);
            rebase();
        }).toVoid().whenComplete(LogUtils.toLogger(logger, LogUtils.thisMethod(), new Object[]{this}));
    }

    public void reset() {
        Reactive.checkInReactorThread(this);
        Checks.checkState(!isSyncing());
        apply(this.otSystem.invert(Utils.concat(Utils.nonNullElseEmpty(this.pendingProtoCommitDiffs), this.workingDiffs)));
        this.workingDiffs.clear();
        this.pendingProtoCommit = null;
        this.pendingProtoCommitDiffs = null;
        resetOrigin();
    }

    void resetOrigin() {
        this.originCommitId = this.commitId;
        this.originLevel = this.level;
        this.originDiffs = List.of();
    }

    public void add(D d) {
        Checks.checkState(isValid());
        addAll(List.of(d));
    }

    public void addAll(List<? extends D> list) {
        Reactive.checkInReactorThread(this);
        Checks.checkState(isValid());
        try {
            for (D d : list) {
                if (!this.otSystem.isEmpty(d)) {
                    this.workingDiffs.add(d);
                    this.state.apply(d);
                    notifyListeners(d);
                }
            }
        } catch (RuntimeException e) {
            invalidateInternalState();
            throw e;
        }
    }

    private void apply(List<D> list) {
        Reactive.checkInReactorThread(this);
        try {
            for (D d : list) {
                this.state.apply(d);
                notifyListeners(d);
            }
        } catch (RuntimeException e) {
            invalidateInternalState();
            throw e;
        }
    }

    private void notifyListeners(D d) {
        Iterator<OTStateManager<K, D, S>.StateChangesListener> it = this.listeners.iterator();
        while (it.hasNext()) {
            it.next().onStateChange(d);
        }
    }

    public void invalidateInternalState() {
        Reactive.checkInReactorThread(this);
        this.state = null;
        this.commitId = null;
        this.originCommitId = null;
        this.level = 0L;
        this.originLevel = 0L;
        this.workingDiffs = null;
        this.originDiffs = null;
        this.pendingProtoCommit = null;
        this.pendingProtoCommitDiffs = null;
        this.poll = null;
    }

    public K getCommitId() {
        return (K) Checks.checkNotNull(this.commitId, "Internal state has been invalidated");
    }

    public K getOriginCommitId() {
        return (K) Checks.checkNotNull(this.originCommitId, "Internal state has been invalidated");
    }

    public S getState() {
        return this.state;
    }

    public boolean isValid() {
        return this.commitId != null;
    }

    public boolean hasWorkingDiffs() {
        return !this.workingDiffs.isEmpty();
    }

    public boolean hasPendingCommits() {
        return this.pendingProtoCommit != null;
    }

    public AsyncOTUplink<K, D, ?> getUplink() {
        return this.uplink;
    }

    public String toString() {
        return "{revision=" + this.commitId + (this.originCommitId != this.commitId ? " origin revision=" + this.originCommitId : "") + " workingDiffs:" + (this.workingDiffs != null ? Integer.valueOf(this.workingDiffs.size()) : null) + " pendingCommits:" + (this.pendingProtoCommit != null) + "}";
    }

    static {
        $assertionsDisabled = !OTStateManager.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(OTStateManager.class);
        STATE_SUBSCRIBER_BUFFER_SIZE = ApplicationSettings.getInt(OTState.class, "stateSubscriberBufferSize", 128).intValue();
    }
}
