package io.vlingo.xoom.lattice.grid.application;

import io.vlingo.xoom.actors.Actor;
import io.vlingo.xoom.actors.Address;
import io.vlingo.xoom.actors.LocalMessage;
import io.vlingo.xoom.actors.Returns;
import io.vlingo.xoom.common.Completes;
import io.vlingo.xoom.common.Scheduler;
import io.vlingo.xoom.lattice.grid.application.GridActorControl;
import io.vlingo.xoom.lattice.grid.application.message.Answer;
import io.vlingo.xoom.lattice.grid.application.message.Decoder;
import io.vlingo.xoom.lattice.grid.application.message.Deliver;
import io.vlingo.xoom.lattice.grid.application.message.Message;
import io.vlingo.xoom.lattice.grid.application.message.Relocate;
import io.vlingo.xoom.lattice.grid.application.message.Start;
import io.vlingo.xoom.lattice.grid.application.message.Visitor;
import io.vlingo.xoom.lattice.grid.application.message.serialization.JavaObjectDecoder;
import io.vlingo.xoom.lattice.grid.hashring.HashRing;
import io.vlingo.xoom.lattice.util.HardRefHolder;
import io.vlingo.xoom.lattice.util.WeakQueue;
import io.vlingo.xoom.wire.message.RawMessage;
import io.vlingo.xoom.wire.node.Id;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/vlingo/xoom/lattice/grid/application/GridApplicationMessageHandler.class */
public final class GridApplicationMessageHandler implements ApplicationMessageHandler {
    private static final Logger logger = LoggerFactory.getLogger(GridApplicationMessageHandler.class);
    private final Id localNode;
    private final AtomicBoolean isClusterHealthy;
    private final HashRing<Id> hashRing;
    private final GridActorControl.Inbound inbound;
    private final GridActorControl.Outbound outbound;
    private final Decoder decoder;
    private final Visitor visitor;
    private final Scheduler scheduler;
    private final HardRefHolder holder;
    private final Queue<Runnable> buffer;

    /* loaded from: input_file:io/vlingo/xoom/lattice/grid/application/GridApplicationMessageHandler$ControlMessageVisitor.class */
    final class ControlMessageVisitor implements Visitor {
        ControlMessageVisitor() {
        }

        @Override // io.vlingo.xoom.lattice.grid.application.message.Visitor
        public void visit(Id id, Id id2, Answer answer) {
            GridApplicationMessageHandler.this.inbound.answer(id, id2, answer);
        }

        @Override // io.vlingo.xoom.lattice.grid.application.message.Visitor
        public <T> void visit(Id id, Id id2, Deliver<T> deliver) {
            Id receiver = receiver(id, deliver.address);
            if (receiver == id) {
                GridApplicationMessageHandler.this.inbound.deliver(id, id2, returnsAnswer(id, id2, deliver), deliver.protocol, deliver.address, deliver.definition, deliver.consumer, deliver.representation);
            } else {
                GridApplicationMessageHandler.this.outbound.forward(receiver, id2, deliver);
            }
        }

        @Override // io.vlingo.xoom.lattice.grid.application.message.Visitor
        public <T> void visit(Id id, Id id2, Start<T> start) {
            Id receiver = receiver(id, start.address);
            if (receiver == id) {
                GridApplicationMessageHandler.this.inbound.start(id, id2, start.protocol, start.address, start.definition);
            } else {
                GridApplicationMessageHandler.this.outbound.forward(receiver, id2, start);
            }
        }

        private Id receiver(Id id, Address address) {
            Id id2 = (Id) GridApplicationMessageHandler.this.hashRing.nodeOf(address.idString());
            return (id2 == null || id2.equals(id)) ? id : id2;
        }

        @Override // io.vlingo.xoom.lattice.grid.application.message.Visitor
        public void visit(Id id, Id id2, Relocate relocate) {
            Id receiver = receiver(id, relocate.address);
            if (receiver != id) {
                GridApplicationMessageHandler.this.outbound.forward(receiver, id2, relocate);
            } else {
                GridApplicationMessageHandler.this.inbound.relocate(id, id2, relocate.definition, relocate.address, relocate.snapshot, (List) relocate.pending.stream().map(deliver -> {
                    return new LocalMessage((Actor) null, deliver.protocol, deliver.consumer, returnsAnswer(id, id2, deliver), deliver.representation);
                }).collect(Collectors.toCollection(ArrayList::new)));
            }
        }

        private Returns<?> returnsAnswer(Id id, Id id2, Deliver<?> deliver) {
            Returns<?> value;
            if (deliver.answerCorrelationId == null) {
                value = null;
            } else {
                Completes using = Completes.using(GridApplicationMessageHandler.this.scheduler);
                using.andThen(obj -> {
                    return new Answer(deliver.answerCorrelationId, obj);
                }).recoverFrom(th -> {
                    return new Answer(deliver.answerCorrelationId, th);
                }).otherwise(obj2 -> {
                    return new Answer(deliver.answerCorrelationId, (Throwable) new TimeoutException());
                }).andThenConsume(4000L, answer -> {
                    GridApplicationMessageHandler.this.outbound.answer(id2, id, answer);
                }).andFinally();
                value = Returns.value(using);
            }
            return value;
        }
    }

    public GridApplicationMessageHandler(Id id, HashRing<Id> hashRing, GridActorControl.Inbound inbound, GridActorControl.Outbound outbound, HardRefHolder hardRefHolder, Scheduler scheduler) {
        this(id, hashRing, inbound, outbound, new JavaObjectDecoder(), hardRefHolder, scheduler);
    }

    public GridApplicationMessageHandler(Id id, HashRing<Id> hashRing, GridActorControl.Inbound inbound, GridActorControl.Outbound outbound, Decoder decoder, HardRefHolder hardRefHolder, Scheduler scheduler) {
        this.isClusterHealthy = new AtomicBoolean(false);
        this.buffer = new WeakQueue();
        this.localNode = id;
        this.hashRing = hashRing;
        this.inbound = inbound;
        this.outbound = outbound;
        this.decoder = decoder;
        this.holder = hardRefHolder;
        this.scheduler = scheduler;
        this.visitor = new ControlMessageVisitor();
    }

    @Override // io.vlingo.xoom.lattice.grid.application.ApplicationMessageHandler
    public void informNodeIsHealthy(Id id, boolean z) {
        this.isClusterHealthy.set(z);
        if (z) {
            disburse(id);
        }
    }

    @Override // io.vlingo.xoom.lattice.grid.application.ApplicationMessageHandler
    public void handle(RawMessage rawMessage) {
        try {
            Message decode = this.decoder.decode(rawMessage.asBinaryMessage());
            Id of = Id.of(rawMessage.header().nodeId());
            logger.debug("Buffering message {} from {}", decode, of);
            Runnable runnable = () -> {
                logger.debug("Handling message {} from {}", decode, of);
                decode.accept(this.localNode, of, this.visitor);
            };
            if (this.isClusterHealthy.get()) {
                runnable.run();
            } else {
                this.buffer.offer(runnable);
            }
            if (Objects.nonNull(this.holder)) {
                this.holder.holdOnTo(runnable);
            }
        } catch (Exception e) {
            logger.error(String.format("Failed to process message %s", rawMessage), e);
        }
    }

    private void disburse(Id id) {
        Runnable poll;
        if (id.equals(this.localNode) && this.buffer.size() != 0) {
            logger.debug("Disbursing {} buffered messages", Integer.valueOf(this.buffer.size()));
            do {
                poll = this.buffer.poll();
                if (poll != null) {
                    poll.run();
                }
            } while (poll != null);
        }
    }
}
