package io.evitadb.core.transaction.stage;

import io.evitadb.api.exception.TransactionException;
import io.evitadb.core.transaction.TransactionManager;
import io.evitadb.core.transaction.stage.TransactionTask;
import io.evitadb.utils.Assert;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SubmissionPublisher;
import java.util.function.BiConsumer;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/evitadb/core/transaction/stage/AbstractTransactionStage.class */
public abstract class AbstractTransactionStage<T extends TransactionTask, F extends TransactionTask> extends SubmissionPublisher<F> implements Flow.Processor<T, F> {
    private static final Logger log = LoggerFactory.getLogger(AbstractTransactionStage.class);
    protected final TransactionManager transactionManager;
    private Flow.Subscription subscription;
    private volatile int stageHandoff;
    private boolean completed;

    @Nonnull
    private final BiConsumer<TransactionTask, Throwable> onException;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractTransactionStage(@Nonnull Executor executor, int i, @Nonnull TransactionManager transactionManager, @Nonnull BiConsumer<TransactionTask, Throwable> biConsumer) {
        super(executor, i);
        this.stageHandoff = 0;
        this.transactionManager = transactionManager;
        this.onException = biConsumer;
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1L);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public final void onNext(T t) {
        try {
            Assert.isPremiseValid(Objects.equals(this.transactionManager.getCatalogName(), t.catalogName()), "Catalog name mismatch!");
            handleNext(t);
        } catch (Throwable th) {
            handleException(t, th);
        }
        this.subscription.request(1L);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleException(@Nonnull T t, @Nonnull Throwable th) {
        log.error("Error while processing " + getName() + " task for catalog `" + t.catalogName() + "`!", th);
        CompletableFuture<Long> future = t.future();
        if (future != null) {
            future.completeExceptionally(th);
        }
        this.onException.accept(t, th);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public final void onError(Throwable th) {
        log.error("Fatal error! Error propagated outside catalog `" + this.transactionManager.getCatalogName() + "` transaction stage! This is unexpected and effectively stops transaction processing!", th);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public final void onComplete() {
        log.debug("Transaction stage completed for catalog `" + this.transactionManager.getCatalogName() + "`!");
        this.completed = true;
    }

    protected abstract String getName();

    protected abstract void handleNext(@Nonnull T t);

    /* JADX INFO: Access modifiers changed from: protected */
    public void push(@Nonnull T t, @Nonnull F f) {
        this.stageHandoff = offer(f, (subscriber, transactionTask) -> {
            String str = f.getClass().isAnnotationPresent(NonRepeatableTask.class) ? " - some committed data will be lost" : "";
            handleException(t, (t.future() == null || f.future() != null) ? new TransactionException("The task " + getName() + " is completed, but cannot push " + f.getClass().getSimpleName() + str + " to next stage and no one will be informed about it!", new RejectedExecutionException()) : new TransactionException("The task " + getName() + " is completed, but cannot push " + f.getClass().getSimpleName() + " to next stage" + str + ".", new RejectedExecutionException()));
            return false;
        });
        if (t.future() == null || f.future() != null) {
            return;
        }
        t.future().complete(Long.valueOf(f.catalogVersion()));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void terminate(@Nonnull T t, @Nonnull F f) {
        if (t.future() == null || f.future() != null) {
            return;
        }
        t.future().complete(Long.valueOf(f.catalogVersion()));
    }

    public int getStageHandoff() {
        return this.stageHandoff;
    }

    public boolean isCompleted() {
        return this.completed;
    }
}
