package io.vlingo.xoom.lattice.grid.application;

import io.vlingo.xoom.actors.Actor;
import io.vlingo.xoom.actors.ActorInstantiator;
import io.vlingo.xoom.actors.Address;
import io.vlingo.xoom.actors.Definition;
import io.vlingo.xoom.actors.Returns;
import io.vlingo.xoom.common.SerializableConsumer;
import io.vlingo.xoom.lattice.grid.application.GridActorControl;
import io.vlingo.xoom.lattice.grid.application.message.Answer;
import io.vlingo.xoom.lattice.grid.application.message.Deliver;
import io.vlingo.xoom.lattice.grid.application.message.Encoder;
import io.vlingo.xoom.lattice.grid.application.message.Forward;
import io.vlingo.xoom.lattice.grid.application.message.Message;
import io.vlingo.xoom.lattice.grid.application.message.Relocate;
import io.vlingo.xoom.lattice.grid.application.message.Start;
import io.vlingo.xoom.lattice.grid.application.message.UnAckMessage;
import io.vlingo.xoom.lattice.grid.application.message.serialization.FSTEncoder;
import io.vlingo.xoom.lattice.util.OutBuffers;
import io.vlingo.xoom.wire.fdx.outbound.ApplicationOutboundStream;
import io.vlingo.xoom.wire.message.RawMessage;
import io.vlingo.xoom.wire.node.Id;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Queue;
import java.util.UUID;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/vlingo/xoom/lattice/grid/application/OutboundGridActorControl.class */
public class OutboundGridActorControl extends Actor implements GridActorControl.Outbound {
    private static final Logger logger = LoggerFactory.getLogger(OutboundGridActorControl.class);
    private final Id localNodeId;
    private ApplicationOutboundStream stream;
    private final Encoder encoder;
    private final BiConsumer<UUID, UnAckMessage> correlation;
    private final OutBuffers outBuffers;

    /* loaded from: input_file:io/vlingo/xoom/lattice/grid/application/OutboundGridActorControl$OutboundGridActorControlInstantiator.class */
    public static class OutboundGridActorControlInstantiator implements ActorInstantiator<OutboundGridActorControl> {
        private static final long serialVersionUID = 8987209018742138417L;
        private final Id id;
        private final FSTEncoder fstEncoder;
        private final BiConsumer<UUID, UnAckMessage> correlation;
        private final OutBuffers outBuffers;

        public OutboundGridActorControlInstantiator(Id id, FSTEncoder fSTEncoder, BiConsumer<UUID, UnAckMessage> biConsumer, OutBuffers outBuffers) {
            this.id = id;
            this.fstEncoder = fSTEncoder;
            this.correlation = biConsumer;
            this.outBuffers = outBuffers;
        }

        /* renamed from: instantiate, reason: merged with bridge method [inline-methods] */
        public OutboundGridActorControl m19instantiate() {
            return new OutboundGridActorControl(this.id, this.fstEncoder, this.correlation, this.outBuffers);
        }
    }

    public OutboundGridActorControl(Id id, Encoder encoder, BiConsumer<UUID, UnAckMessage> biConsumer, OutBuffers outBuffers) {
        this(id, null, encoder, biConsumer, outBuffers);
    }

    public OutboundGridActorControl(Id id, ApplicationOutboundStream applicationOutboundStream, Encoder encoder, BiConsumer<UUID, UnAckMessage> biConsumer, OutBuffers outBuffers) {
        this.localNodeId = id;
        this.stream = applicationOutboundStream;
        this.encoder = encoder;
        this.correlation = biConsumer;
        this.outBuffers = outBuffers;
    }

    @Override // io.vlingo.xoom.lattice.grid.application.GridActorControl
    public void disburse(Id id) {
        Runnable poll;
        Queue<Runnable> queue = this.outBuffers.queue(id);
        logger.debug("Disbursing buffered messages");
        do {
            poll = queue.poll();
            if (poll != null) {
                poll.run();
            }
        } while (poll != null);
    }

    private void send(Id id, Message message) {
        logger.debug("Buffering message {} to {}", message, id);
        this.outBuffers.enqueue(id, () -> {
            logger.debug("Sending message {} to {}", message, id);
            byte[] encode = this.encoder.encode(message);
            RawMessage from = RawMessage.from(this.localNodeId.value(), -1, encode.length);
            from.putRemaining(ByteBuffer.wrap(encode));
            this.stream.sendTo(from, id);
        });
    }

    @Override // io.vlingo.xoom.lattice.grid.application.GridActorControl
    public <T> void start(Id id, Id id2, Class<T> cls, Address address, Definition.SerializationProxy serializationProxy) {
        send(id, new Start(cls, address, serializationProxy));
    }

    @Override // io.vlingo.xoom.lattice.grid.application.GridActorControl
    public <T> void deliver(Id id, Id id2, Returns<?> returns, Class<T> cls, Address address, Definition.SerializationProxy serializationProxy, SerializableConsumer<T> serializableConsumer, String str) {
        Deliver deliver;
        if (returns == null) {
            deliver = new Deliver(cls, address, serializationProxy, serializableConsumer, str);
        } else {
            UUID randomUUID = UUID.randomUUID();
            deliver = new Deliver(cls, address, serializationProxy, serializableConsumer, randomUUID, str);
            this.correlation.accept(randomUUID, new UnAckMessage(id, returns, deliver));
        }
        send(id, deliver);
    }

    @Override // io.vlingo.xoom.lattice.grid.application.GridActorControl
    public <T> void answer(Id id, Id id2, Answer<T> answer) {
        send(id, answer);
    }

    @Override // io.vlingo.xoom.lattice.grid.application.GridActorControl
    public void forward(Id id, Id id2, Message message) {
        send(id, new Forward(id2, message));
    }

    @Override // io.vlingo.xoom.lattice.grid.application.GridActorControl
    public void relocate(Id id, Id id2, Definition.SerializationProxy serializationProxy, Address address, Object obj, List<? extends io.vlingo.xoom.actors.Message> list) {
        send(id, new Relocate(address, serializationProxy, obj, (List) list.stream().map(Deliver.from(this.correlation, id)).collect(Collectors.toList())));
    }

    @Override // io.vlingo.xoom.lattice.grid.application.GridActorControl.Outbound
    public void useStream(ApplicationOutboundStream applicationOutboundStream) {
        this.stream = applicationOutboundStream;
    }
}
