package io.reacted.core.mailboxes;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.reacted.core.messages.Message;
import io.reacted.core.messages.reactors.DeliveryStatus;
import io.reacted.core.reactorsystem.ReActorContext;
import io.reacted.core.utils.ObjectUtils;
import io.reacted.patterns.NonNullByDefault;
import io.reacted.patterns.Try;
import java.io.Serializable;
import java.time.Duration;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NonNullByDefault
/* loaded from: input_file:io/reacted/core/mailboxes/BackpressuringMbox.class */
public class BackpressuringMbox implements MailBox {
    public static final Duration BEST_EFFORT_TIMEOUT = Duration.ZERO;
    public static final Duration RELIABLE_DELIVERY_TIMEOUT = Duration.ofNanos(Long.MAX_VALUE);
    private static final Logger LOGGER = LoggerFactory.getLogger(BackpressuringMbox.class);
    private final Duration backpressureTimeout;
    private final MailBox realMbox;
    private final SubmissionPublisher<DeliveryRequest> backpressurer;
    private final BackpressuringSubscriber reliableBackpressuringSubscriber;
    private final Set<Class<? extends Serializable>> notDelayed;
    private final Set<Class<? extends Serializable>> notBackpressurable;
    private final ExecutorService sequencer;
    private final boolean isPrivateSequencer;

    /* loaded from: input_file:io/reacted/core/mailboxes/BackpressuringMbox$Builder.class */
    public static class Builder {
        private MailBox realMbox;
        private Duration backpressureTimeout;
        private int bufferSize;
        private int requestOnStartup;
        private Executor ayncBackpressurer;

        @Nullable
        private ThreadPoolExecutor sequencer;
        private ReActorContext realMailboxOwner;
        private Set<Class<? extends Serializable>> notDelayable = Set.of();
        private Set<Class<? extends Serializable>> notBackpressurable = Set.of();

        private Builder() {
        }

        public Builder setRealMbox(MailBox mailBox) {
            this.realMbox = mailBox;
            return this;
        }

        public Builder setBackpressureTimeout(Duration duration) {
            this.backpressureTimeout = duration;
            return this;
        }

        public Builder setBufferSize(int i) {
            this.bufferSize = i;
            return this;
        }

        public Builder setRequestOnStartup(int i) {
            this.requestOnStartup = i;
            return this;
        }

        public Builder setAsyncBackpressurer(Executor executor) {
            this.ayncBackpressurer = executor;
            return this;
        }

        public Builder setSequencer(@Nullable ThreadPoolExecutor threadPoolExecutor) {
            this.sequencer = threadPoolExecutor;
            return this;
        }

        public Builder setNonDelayable(Set<Class<? extends Serializable>> set) {
            this.notDelayable = set;
            return this;
        }

        public Builder setNonBackpressurable(Set<Class<? extends Serializable>> set) {
            this.notBackpressurable = set;
            return this;
        }

        public Builder setRealMailboxOwner(ReActorContext reActorContext) {
            this.realMailboxOwner = reActorContext;
            return this;
        }

        public BackpressuringMbox build() {
            return new BackpressuringMbox(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/reacted/core/mailboxes/BackpressuringMbox$DeliveryRequest.class */
    public static class DeliveryRequest {
        final Message deliveryPayload;
        final CompletableFuture<Try<DeliveryStatus>> pendingTrigger;

        private DeliveryRequest(Message message, CompletableFuture<Try<DeliveryStatus>> completableFuture) {
            this.deliveryPayload = message;
            this.pendingTrigger = completableFuture;
        }
    }

    private BackpressuringMbox(Builder builder) {
        ReActorContext reActorContext = (ReActorContext) Objects.requireNonNull(builder.realMailboxOwner);
        this.backpressureTimeout = (Duration) ObjectUtils.requiredCondition((Duration) Objects.requireNonNull(builder.backpressureTimeout), duration -> {
            return duration.compareTo(RELIABLE_DELIVERY_TIMEOUT) <= 0;
        }, () -> {
            return new IllegalArgumentException("Invalid backpressure timeout");
        });
        this.realMbox = (MailBox) Objects.requireNonNull(builder.realMbox);
        this.notDelayed = (Set) Objects.requireNonNull(builder.notDelayable);
        this.notBackpressurable = (Set) Objects.requireNonNull(builder.notBackpressurable);
        int intValue = ((Integer) ObjectUtils.requiredInRange(Integer.valueOf(builder.bufferSize), 1, Integer.MAX_VALUE, IllegalArgumentException::new)).intValue();
        int intValue2 = ((Integer) ObjectUtils.requiredInRange(Integer.valueOf(builder.requestOnStartup), 0, Integer.MAX_VALUE, IllegalArgumentException::new)).intValue();
        ThreadFactory build = new ThreadFactoryBuilder().setUncaughtExceptionHandler((thread, th) -> {
            LOGGER.error("Uncaught exception in {} delivery thread", BackpressuringMbox.class.getSimpleName(), th);
        }).build();
        if (builder.sequencer == null) {
            this.isPrivateSequencer = true;
            this.sequencer = Executors.newSingleThreadExecutor(build);
        } else {
            ObjectUtils.requiredInRange(Integer.valueOf(builder.sequencer.getMaximumPoolSize()), 0, 1, IllegalArgumentException::new);
            this.sequencer = builder.sequencer;
            this.isPrivateSequencer = false;
        }
        this.backpressurer = new SubmissionPublisher<>((Executor) Objects.requireNonNull(builder.ayncBackpressurer), intValue);
        MailBox mailBox = this.realMbox;
        Objects.requireNonNull(mailBox);
        this.reliableBackpressuringSubscriber = new BackpressuringSubscriber(intValue2, reActorContext, mailBox::deliver, this.backpressurer);
        this.backpressurer.subscribe(this.reliableBackpressuringSubscriber);
    }

    public static Builder newBuilder() {
        return new Builder();
    }

    @Override // io.reacted.core.mailboxes.MailBox
    public boolean isEmpty() {
        return this.realMbox.isEmpty();
    }

    @Override // io.reacted.core.mailboxes.MailBox
    public boolean isFull() {
        return this.realMbox.isFull();
    }

    @Override // io.reacted.core.mailboxes.MailBox
    public long getMsgNum() {
        return this.realMbox.getMsgNum() + Integer.max(this.backpressurer.estimateMaximumLag(), 0);
    }

    @Override // io.reacted.core.mailboxes.MailBox
    public long getMaxSize() {
        return this.realMbox.getMaxSize();
    }

    @Override // io.reacted.core.mailboxes.MailBox
    public Message getNextMessage() {
        return this.realMbox.getNextMessage();
    }

    @Override // io.reacted.core.mailboxes.MailBox
    public DeliveryStatus deliver(Message message) {
        return this.realMbox.deliver(message);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // io.reacted.core.mailboxes.MailBox
    public CompletionStage<Try<DeliveryStatus>> asyncDeliver(Message message) {
        Class<?> cls = message.getPayload().getClass();
        if (shouldNotBeDelayed(cls)) {
            return CompletableFuture.completedFuture(Try.of(() -> {
                return deliver(message);
            }));
        }
        CompletableFuture completableFuture = new CompletableFuture();
        if (isBestEffort(this.backpressureTimeout) && canBeBackPressured(cls)) {
            reliableDelivery(message, this.backpressureTimeout, completableFuture);
        } else {
            try {
                this.sequencer.execute(() -> {
                    reliableDelivery(message, shouldNotBeBackPressured(cls) ? RELIABLE_DELIVERY_TIMEOUT : this.backpressureTimeout, completableFuture);
                });
            } catch (Exception e) {
                completableFuture.complete(Try.ofFailure(e));
            }
        }
        return completableFuture;
    }

    public void request(long j) {
        ((BackpressuringSubscriber) Objects.requireNonNull(this.reliableBackpressuringSubscriber)).request(j);
    }

    @Override // io.reacted.core.mailboxes.MailBox, java.lang.AutoCloseable
    public void close() throws Exception {
        this.reliableBackpressuringSubscriber.onComplete();
        this.backpressurer.close();
        if (this.isPrivateSequencer) {
            this.sequencer.shutdownNow();
        }
        this.realMbox.close();
    }

    private void reliableDelivery(Message message, Duration duration, CompletableFuture<Try<DeliveryStatus>> completableFuture) {
        try {
            if (this.backpressurer.offer(new DeliveryRequest(message, completableFuture), duration.toNanos(), TimeUnit.NANOSECONDS, BackpressuringMbox::onBackPressure) < 0) {
                completableFuture.complete(Try.ofSuccess(DeliveryStatus.BACKPRESSURED));
            } else {
                completableFuture.complete(Try.ofSuccess(DeliveryStatus.DELIVERED));
            }
        } catch (Exception e) {
            completableFuture.complete(Try.ofFailure(e));
        }
    }

    private boolean isBestEffort(Duration duration) {
        return duration.compareTo(BEST_EFFORT_TIMEOUT) == 0;
    }

    private boolean shouldNotBeDelayed(Class<? extends Serializable> cls) {
        return this.notDelayed.contains(cls);
    }

    private boolean canBeBackPressured(Class<? extends Serializable> cls) {
        return !shouldNotBeBackPressured(cls);
    }

    private boolean shouldNotBeBackPressured(Class<? extends Serializable> cls) {
        return this.notBackpressurable.contains(cls);
    }

    private static boolean onBackPressure(Flow.Subscriber<? super DeliveryRequest> subscriber, DeliveryRequest deliveryRequest) {
        deliveryRequest.pendingTrigger.complete(Try.ofSuccess(DeliveryStatus.BACKPRESSURED));
        return false;
    }
}
