package se.sics.kompics.network.data;

import com.lkroll.common.Either;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import se.sics.kompics.ComponentDefinition;
import se.sics.kompics.Handler;
import se.sics.kompics.Negative;
import se.sics.kompics.Positive;
import se.sics.kompics.Start;
import se.sics.kompics.network.Header;
import se.sics.kompics.network.MessageNotify;
import se.sics.kompics.network.Msg;
import se.sics.kompics.network.Network;
import se.sics.kompics.network.Transport;
import se.sics.kompics.timer.CancelPeriodicTimeout;
import se.sics.kompics.timer.SchedulePeriodicTimeout;
import se.sics.kompics.timer.ScheduleTimeout;
import se.sics.kompics.timer.Timeout;
import se.sics.kompics.timer.Timer;

/* loaded from: input_file:se/sics/kompics/network/data/DataStreamInterceptor.class */
public class DataStreamInterceptor extends ComponentDefinition {
    static final Logger EXT_LOG = LoggerFactory.getLogger(DataStreamInterceptor.class);
    final Positive<Timer> timer = requires(Timer.class);
    final Positive<Network> netDown = requires(Network.class);
    final Negative<Network> netUp = provides(Network.class);
    private final Map<UUID, TrackedMessage> outstanding = new HashMap();
    private UUID timeoutId = null;
    private final HashMap<InetSocketAddress, ConnectionTracker> connections = new HashMap<>();
    Handler<Start> startHandler = new Handler<Start>() { // from class: se.sics.kompics.network.data.DataStreamInterceptor.1
        public void handle(Start start) {
            SchedulePeriodicTimeout schedulePeriodicTimeout = new SchedulePeriodicTimeout(1000L, 1000L);
            StatsTimeout statsTimeout = new StatsTimeout(schedulePeriodicTimeout);
            schedulePeriodicTimeout.setTimeoutEvent(statsTimeout);
            DataStreamInterceptor.this.trigger(schedulePeriodicTimeout, DataStreamInterceptor.this.timer);
            DataStreamInterceptor.this.timeoutId = statsTimeout.getTimeoutId();
        }
    };
    Handler<Msg> msgHandler = new Handler<Msg>() { // from class: se.sics.kompics.network.data.DataStreamInterceptor.2
        public void handle(Msg msg) {
            Header header = msg.getHeader();
            if (header.getProtocol() != Transport.DATA) {
                throw new RuntimeException("Invalid protocol: " + header.getProtocol());
            }
            InetSocketAddress asSocket = header.getDestination().asSocket();
            ConnectionTracker connectionTracker = (ConnectionTracker) DataStreamInterceptor.this.connections.get(asSocket);
            if (connectionTracker == null) {
                connectionTracker = DataStreamInterceptor.this.factory.findConnection(asSocket);
                DataStreamInterceptor.this.connections.put(asSocket, connectionTracker);
            }
            connectionTracker.enqueue((ConnectionTracker) msg);
            DataStreamInterceptor.this.tryToSend(connectionTracker);
        }
    };
    Handler<MessageNotify.Req> reqHandler = new Handler<MessageNotify.Req>() { // from class: se.sics.kompics.network.data.DataStreamInterceptor.3
        public void handle(MessageNotify.Req req) {
            Header header = req.msg.getHeader();
            if (header.getProtocol() != Transport.DATA) {
                throw new RuntimeException("Invalid protocol: " + header.getProtocol());
            }
            InetSocketAddress asSocket = header.getDestination().asSocket();
            ConnectionTracker connectionTracker = (ConnectionTracker) DataStreamInterceptor.this.connections.get(asSocket);
            if (connectionTracker == null) {
                connectionTracker = DataStreamInterceptor.this.factory.findConnection(asSocket);
                DataStreamInterceptor.this.connections.put(asSocket, connectionTracker);
            }
            connectionTracker.enqueue(req);
            DataStreamInterceptor.this.tryToSend(connectionTracker);
        }
    };
    Handler<MessageNotify.Resp> respHandler = new Handler<MessageNotify.Resp>() { // from class: se.sics.kompics.network.data.DataStreamInterceptor.4
        public void handle(MessageNotify.Resp resp) {
            if (!resp.isSuccess()) {
                System.out.println("Could not send message: " + resp.msgId);
            }
            TrackedMessage trackedMessage = (TrackedMessage) DataStreamInterceptor.this.outstanding.get(resp.msgId);
            ConnectionTracker connectionTracker = trackedMessage.connection;
            if (trackedMessage == null) {
                DataStreamInterceptor.this.logger.warn("Got a response for an untracked message...something is probably wrong: \n {}", resp);
                return;
            }
            switch (AnonymousClass6.$SwitchMap$se$sics$kompics$network$MessageNotify$State[resp.getState().ordinal()]) {
                case Statistics.WINDOW_SIZE /* 1 */:
                    DataStreamInterceptor.this.logger.trace("Got a sent notify: {}", resp);
                    if (trackedMessage.originalRequest.isPresent()) {
                        MessageNotify.Req req = trackedMessage.originalRequest.get();
                        req.injectSize(resp.getSize(), 0L);
                        req.prepareResponse(resp.getTime(), resp.isSuccess(), resp.getSendTime());
                        DataStreamInterceptor.this.answer(req);
                    }
                    connectionTracker.sent(resp.msgId);
                    DataStreamInterceptor.this.tryToSend(connectionTracker);
                    return;
                case 2:
                    DataStreamInterceptor.this.logger.trace("Got a delivery notify: {}", resp);
                    trackedMessage.connection.stats.update(resp.getDeliveryTime() / 1.0E9d, resp.getSize());
                    if (trackedMessage.originalRequest.isPresent()) {
                        MessageNotify.Req req2 = trackedMessage.originalRequest.get();
                        if (req2.notifyOfDelivery) {
                            req2.injectSize(resp.getSize(), 0L);
                            DataStreamInterceptor.this.answer(req2, req2.deliveryResponse(resp.getTime(), resp.isSuccess(), resp.getDeliveryTime()));
                        }
                    }
                    DataStreamInterceptor.this.outstanding.remove(resp.msgId);
                    return;
                default:
                    DataStreamInterceptor.this.logger.trace("Got a notify with a failure state: {}", resp);
                    if (trackedMessage.originalRequest.isPresent()) {
                        MessageNotify.Req req3 = trackedMessage.originalRequest.get();
                        req3.injectSize(resp.getSize(), 0L);
                        req3.prepareResponse(resp.getTime(), resp.isSuccess(), resp.getSendTime());
                        DataStreamInterceptor.this.answer(req3);
                    }
                    DataStreamInterceptor.this.outstanding.remove(resp.msgId);
                    connectionTracker.sent(resp.msgId);
                    DataStreamInterceptor.this.tryToSend(connectionTracker);
                    return;
            }
        }
    };
    Handler<StatsTimeout> timeoutHandler = new Handler<StatsTimeout>() { // from class: se.sics.kompics.network.data.DataStreamInterceptor.5
        public void handle(StatsTimeout statsTimeout) {
            Iterator it = DataStreamInterceptor.this.connections.values().iterator();
            while (it.hasNext()) {
                ((ConnectionTracker) it.next()).update();
            }
        }
    };
    private final long maxQueueLength = ((Long) config().getValue("kompics.net.data.queueLength", Long.class)).longValue();
    private final ConnectionFactory factory = new ConnectionFactory(config(), config().readValue("kompics.net.data.ratioPolicy", String.class), config().readValue("kompics.net.data.selectionPolicy", String.class));

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: se.sics.kompics.network.data.DataStreamInterceptor$6, reason: invalid class name */
    /* loaded from: input_file:se/sics/kompics/network/data/DataStreamInterceptor$6.class */
    public static /* synthetic */ class AnonymousClass6 {
        static final /* synthetic */ int[] $SwitchMap$se$sics$kompics$network$MessageNotify$State = new int[MessageNotify.State.values().length];

        static {
            try {
                $SwitchMap$se$sics$kompics$network$MessageNotify$State[MessageNotify.State.SENT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$se$sics$kompics$network$MessageNotify$State[MessageNotify.State.DELIVERED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* loaded from: input_file:se/sics/kompics/network/data/DataStreamInterceptor$StatsTimeout.class */
    public static class StatsTimeout extends Timeout {
        public StatsTimeout(ScheduleTimeout scheduleTimeout) {
            super(scheduleTimeout);
        }

        public StatsTimeout(SchedulePeriodicTimeout schedulePeriodicTimeout) {
            super(schedulePeriodicTimeout);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:se/sics/kompics/network/data/DataStreamInterceptor$TrackedMessage.class */
    public static class TrackedMessage {
        public final Msg msg;
        public final long ts;
        public final Optional<MessageNotify.Req> originalRequest;
        public final ConnectionTracker connection;

        TrackedMessage(Msg msg, long j, Optional<MessageNotify.Req> optional, ConnectionTracker connectionTracker) {
            this.msg = msg;
            this.ts = j;
            this.originalRequest = optional;
            this.connection = connectionTracker;
        }
    }

    public DataStreamInterceptor() {
        subscribe(this.startHandler, this.control);
        subscribe(this.msgHandler, this.netUp);
        subscribe(this.reqHandler, this.netUp);
        subscribe(this.respHandler, this.netDown);
        subscribe(this.timeoutHandler, this.timer);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void tryToSend(ConnectionTracker connectionTracker) {
        Msg msg;
        Optional empty;
        MessageNotify.Req createWithDeliveryNotification;
        while (connectionTracker.canSend(this.maxQueueLength)) {
            Either dequeue = connectionTracker.dequeue();
            if (dequeue.isLeft()) {
                MessageNotify.Req req = (MessageNotify.Req) dequeue.getLeft();
                msg = req.msg;
                empty = Optional.of(req);
            } else {
                msg = (Msg) dequeue.getRight();
                empty = Optional.empty();
            }
            Header header = msg.getHeader();
            Transport select = connectionTracker.selectionPolicy.select(msg);
            connectionTracker.stats.updateSelection(select);
            this.logger.trace("Got DATA message over {} to track: {}", select, msg);
            long currentTimeMillis = System.currentTimeMillis();
            if (header instanceof DataHeader) {
                ((DataHeader) msg.getHeader()).replaceProtocol(select);
                createWithDeliveryNotification = MessageNotify.createWithDeliveryNotification(msg);
            } else {
                createWithDeliveryNotification = MessageNotify.createWithDeliveryNotification(new DataMsgWrapper(msg, select));
            }
            MessageNotify.Req req2 = createWithDeliveryNotification;
            TrackedMessage trackedMessage = new TrackedMessage(msg, currentTimeMillis, empty, connectionTracker);
            trigger(req2, this.netDown);
            this.outstanding.put(req2.getMsgId(), trackedMessage);
        }
    }

    public void tearDown() {
        this.logger.debug("Cleaning up timeouts.");
        if (this.timeoutId != null) {
            trigger(new CancelPeriodicTimeout(this.timeoutId), this.timer);
        }
    }
}
