package io.opencmw.client.cmwlight;

import io.opencmw.client.DataSource;
import io.opencmw.client.Endpoint;
import io.opencmw.client.cmwlight.CmwLightMessage;
import io.opencmw.client.cmwlight.CmwLightProtocol;
import io.opencmw.client.cmwlight.DirectoryLightClient;
import io.opencmw.serialiser.IoSerialiser;
import io.opencmw.serialiser.spi.CmwLightSerialiser;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMQException;
import org.zeromq.ZMsg;

/* loaded from: input_file:io/opencmw/client/cmwlight/CmwLightDataSource.class */
public class CmwLightDataSource extends DataSource {
    public static final String RDA_3_PROTOCOL = "rda3://";
    protected static final int HEARTBEAT_INTERVAL = 1000;
    protected static final int HEARTBEAT_ALLOWED_MISSES = 3;
    protected static final long SUBSCRIPTION_TIMEOUT = 1000;
    private static DirectoryLightClient directoryLightClient;
    protected final AtomicInteger channelId;
    protected final ZContext context;
    protected final ZMQ.Socket socket;
    protected final AtomicReference<ConnectionState> connectionState;
    private final String address;
    protected final String sessionId;
    protected long connectionId;
    protected final Map<Long, Subscription> subscriptions;
    protected final Map<String, Subscription> subscriptionsByReqId;
    protected final Map<Long, Subscription> replyIdMap;
    protected long lastHbReceived;
    protected long lastHbSent;
    protected int backOff;
    private final Queue<Request> queuedRequests;
    private final Map<Long, Request> pendingRequests;
    private String connectedAddress;
    private static final Logger LOGGER = LoggerFactory.getLogger(CmwLightDataSource.class);
    private static final AtomicLong CONNECTION_ID_GENERATOR = new AtomicLong(0);
    private static final AtomicInteger REQUEST_ID_GENERATOR = new AtomicInteger(0);
    public static final DataSource.Factory FACTORY = new DataSource.Factory() { // from class: io.opencmw.client.cmwlight.CmwLightDataSource.1
        @Override // io.opencmw.client.DataSource.Factory
        public boolean matches(String str) {
            return str.startsWith(CmwLightDataSource.RDA_3_PROTOCOL);
        }

        @Override // io.opencmw.client.DataSource.Factory
        public Class<? extends IoSerialiser> getMatchingSerialiserType(String str) {
            return CmwLightSerialiser.class;
        }

        @Override // io.opencmw.client.DataSource.Factory
        public DataSource newInstance(ZContext zContext, String str, Duration duration, String str2) {
            return new CmwLightDataSource(zContext, str, str2);
        }
    };

    /* loaded from: input_file:io/opencmw/client/cmwlight/CmwLightDataSource$ConnectionState.class */
    public enum ConnectionState {
        DISCONNECTED,
        CONNECTING,
        CONNECTED
    }

    /* loaded from: input_file:io/opencmw/client/cmwlight/CmwLightDataSource$Request.class */
    public static class Request {
        public final byte[] filters;
        public final byte[] data;
        public final long id = CmwLightDataSource.REQUEST_ID_GENERATOR.incrementAndGet();
        private final String requestId;
        private final String endpoint;
        private final byte[] rbacToken;
        public final CmwLightProtocol.RequestType requestType;

        public Request(CmwLightProtocol.RequestType requestType, String str, String str2, byte[] bArr, byte[] bArr2, byte[] bArr3) {
            this.requestType = requestType;
            this.requestId = str;
            this.endpoint = str2;
            this.filters = bArr;
            this.data = bArr2;
            this.rbacToken = bArr3;
        }
    }

    /* loaded from: input_file:io/opencmw/client/cmwlight/CmwLightDataSource$Subscription.class */
    public static class Subscription {
        public final String property;
        public final String device;
        public final String selector;
        public final Map<String, Object> filters;
        public final String endpoint;
        private final long id = CmwLightDataSource.REQUEST_ID_GENERATOR.incrementAndGet();
        public SubscriptionState subscriptionState = SubscriptionState.UNSUBSCRIBED;
        public int backOff = 20;
        public long updateId = -1;
        public long timeoutValue = -1;
        public String idString = "";

        public Subscription(String str, String str2, String str3, String str4, Map<String, Object> map) {
            this.endpoint = str;
            this.property = str3;
            this.device = str2;
            this.selector = str4;
            this.filters = map;
        }

        public String toString() {
            String str = this.property;
            String str2 = this.device;
            String str3 = this.selector;
            Map<String, Object> map = this.filters;
            SubscriptionState subscriptionState = this.subscriptionState;
            int i = this.backOff;
            long j = this.id;
            long j2 = this.updateId;
            long j3 = this.timeoutValue;
            return "Subscription{property='" + str + "', device='" + str2 + "', selector='" + str3 + "', filters=" + map + ", subscriptionState=" + subscriptionState + ", backOff=" + i + ", id=" + j + ", updateId=" + str + ", timeoutValue=" + j2 + "}";
        }
    }

    /* loaded from: input_file:io/opencmw/client/cmwlight/CmwLightDataSource$SubscriptionState.class */
    public enum SubscriptionState {
        UNSUBSCRIBED,
        SUBSCRIBING,
        SUBSCRIBED,
        CANCELED,
        UNSUBSCRIBE_SENT
    }

    public CmwLightDataSource(ZContext zContext, String str, String str2) {
        super(str);
        this.channelId = new AtomicInteger(0);
        this.connectionState = new AtomicReference<>(ConnectionState.DISCONNECTED);
        this.subscriptions = new HashMap();
        this.subscriptionsByReqId = new HashMap();
        this.replyIdMap = new HashMap();
        this.lastHbReceived = -1L;
        this.lastHbSent = -1L;
        this.backOff = 20;
        this.queuedRequests = new LinkedBlockingQueue();
        this.pendingRequests = new HashMap();
        this.connectedAddress = "";
        LOGGER.atTrace().addArgument(str).log("connecting to: {}");
        this.context = zContext;
        this.socket = zContext.createSocket(SocketType.DEALER);
        this.sessionId = getSessionId(str2);
        this.address = new Endpoint(str).getAddress();
    }

    public static DirectoryLightClient getDirectoryLightClient() {
        return directoryLightClient;
    }

    public static void setDirectoryLightClient(DirectoryLightClient directoryLightClient2) {
        directoryLightClient = directoryLightClient2;
    }

    public CmwLightMessage receiveData() {
        try {
            ZMsg recvMsg = ZMsg.recvMsg(this.socket, 1);
            if (recvMsg == null) {
                return null;
            }
            return CmwLightProtocol.parseMsg(recvMsg);
        } catch (CmwLightProtocol.RdaLightException e) {
            LOGGER.atDebug().setCause(e).log("error parsing cmw light reply: ");
            return null;
        }
    }

    @Override // io.opencmw.client.DataSource
    public ZMsg getMessage() {
        long currentTimeMillis = System.currentTimeMillis();
        CmwLightMessage receiveData = receiveData();
        if (receiveData == null) {
            return null;
        }
        switch (receiveData.messageType) {
            case SERVER_CONNECT_ACK:
                if (this.connectionState.get().equals(ConnectionState.CONNECTING)) {
                    LOGGER.atTrace().addArgument(this.connectedAddress).log("Connected to server: {}");
                    this.connectionState.set(ConnectionState.CONNECTED);
                    this.lastHbReceived = currentTimeMillis;
                    this.backOff = 20;
                } else {
                    LOGGER.atWarn().addArgument(receiveData).log("ignoring unsolicited connection acknowledgement: {}");
                }
                return new ZMsg();
            case SERVER_HB:
                if (this.connectionState.get() != ConnectionState.CONNECTED) {
                    LOGGER.atWarn().addArgument(receiveData).log("ignoring heartbeat received before connection established: {}");
                    return new ZMsg();
                }
                this.lastHbReceived = currentTimeMillis;
                return new ZMsg();
            case SERVER_REP:
                if (this.connectionState.get() != ConnectionState.CONNECTED) {
                    LOGGER.atWarn().addArgument(receiveData).log("ignoring data received before connection established: {}");
                    return new ZMsg();
                }
                this.lastHbReceived = currentTimeMillis;
                return handleServerReply(receiveData, currentTimeMillis);
            case CLIENT_CONNECT:
            case CLIENT_REQ:
            case CLIENT_HB:
            default:
                LOGGER.atWarn().addArgument(receiveData).log("ignoring client message from server: {}");
                return new ZMsg();
        }
    }

    private ZMsg handleServerReply(CmwLightMessage cmwLightMessage, long j) {
        ZMsg zMsg = new ZMsg();
        switch (cmwLightMessage.requestType) {
            case REPLY:
                Request remove = this.pendingRequests.remove(Long.valueOf(cmwLightMessage.id));
                zMsg.add(remove.requestId);
                zMsg.add(new Endpoint(remove.endpoint).getEndpointForContext(cmwLightMessage.dataContext.cycleName));
                zMsg.add(new ZFrame(new byte[0]));
                zMsg.add(cmwLightMessage.bodyData);
                zMsg.add(new ZFrame(new byte[0]));
                return zMsg;
            case EXCEPTION:
                Request remove2 = this.pendingRequests.remove(Long.valueOf(cmwLightMessage.id));
                zMsg.add(remove2.requestId);
                zMsg.add(remove2.endpoint);
                zMsg.add(new ZFrame(new byte[0]));
                zMsg.add(new ZFrame(new byte[0]));
                zMsg.add(cmwLightMessage.exceptionMessage.message);
                return zMsg;
            case SUBSCRIBE:
                Subscription subscription = this.subscriptions.get(Long.valueOf(cmwLightMessage.id));
                subscription.updateId = ((Long) cmwLightMessage.options.get(CmwLightProtocol.FieldName.SOURCE_ID_TAG.value())).longValue();
                this.replyIdMap.put(Long.valueOf(subscription.updateId), subscription);
                subscription.subscriptionState = SubscriptionState.SUBSCRIBED;
                LOGGER.atDebug().addArgument(subscription.device).addArgument(subscription.property).log("subscription successful: {}/{}");
                subscription.backOff = 20;
                return zMsg;
            case UNSUBSCRIBE:
                Subscription remove3 = this.subscriptions.remove(Long.valueOf(cmwLightMessage.id));
                this.subscriptionsByReqId.remove(remove3.idString);
                this.replyIdMap.remove(Long.valueOf(remove3.updateId));
                return zMsg;
            case NOTIFICATION_DATA:
                Subscription subscription2 = this.replyIdMap.get(Long.valueOf(cmwLightMessage.id));
                if (subscription2 == null) {
                    LOGGER.atInfo().addArgument(cmwLightMessage.toString()).log("Got unsolicited subscription data: {}");
                    return zMsg;
                }
                zMsg.add(subscription2.idString);
                zMsg.add(new Endpoint(subscription2.endpoint).getEndpointForContext(cmwLightMessage.dataContext.cycleName));
                zMsg.add(new ZFrame(new byte[0]));
                zMsg.add(cmwLightMessage.bodyData);
                zMsg.add(new ZFrame(new byte[0]));
                return zMsg;
            case NOTIFICATION_EXC:
                Subscription subscription3 = this.replyIdMap.get(Long.valueOf(cmwLightMessage.id));
                if (subscription3 == null) {
                    LOGGER.atInfo().addArgument(cmwLightMessage.toString()).log("Got unsolicited subscription notification error: {}");
                    return zMsg;
                }
                zMsg.add(subscription3.idString);
                zMsg.add(subscription3.endpoint);
                zMsg.add(new ZFrame(new byte[0]));
                zMsg.add(new ZFrame(new byte[0]));
                zMsg.add(cmwLightMessage.exceptionMessage.message);
                return zMsg;
            case SUBSCRIBE_EXCEPTION:
                Subscription subscription4 = this.subscriptions.get(Long.valueOf(cmwLightMessage.id));
                subscription4.subscriptionState = SubscriptionState.UNSUBSCRIBED;
                subscription4.timeoutValue = j + subscription4.backOff;
                subscription4.backOff *= 2;
                LOGGER.atDebug().addArgument(subscription4.device).addArgument(subscription4.property).log("exception during subscription, retrying: {}/{}");
                zMsg.add(subscription4.idString);
                zMsg.add(subscription4.endpoint);
                zMsg.add(new ZFrame(new byte[0]));
                zMsg.add(new ZFrame(new byte[0]));
                zMsg.add(cmwLightMessage.exceptionMessage.message);
                return zMsg;
            case GET:
            case SET:
            case CONNECT:
            case EVENT:
            case SESSION_CONFIRM:
            default:
                return zMsg;
        }
    }

    @Override // io.opencmw.client.DataSource
    public void get(String str, String str2, byte[] bArr, byte[] bArr2, byte[] bArr3) {
        this.queuedRequests.add(new Request(CmwLightProtocol.RequestType.GET, str, str2, bArr, bArr2, bArr3));
    }

    @Override // io.opencmw.client.DataSource
    public void set(String str, String str2, byte[] bArr, byte[] bArr2, byte[] bArr3) {
        this.queuedRequests.add(new Request(CmwLightProtocol.RequestType.SET, str, str2, bArr, bArr2, bArr3));
    }

    @Override // io.opencmw.client.DataSource
    public void subscribe(String str, String str2, byte[] bArr) {
        Endpoint endpoint = new Endpoint(str2);
        Subscription subscription = new Subscription(str2, endpoint.getDevice(), endpoint.getProperty(), endpoint.getSelector(), endpoint.getFilters());
        subscription.idString = str;
        this.subscriptions.put(Long.valueOf(subscription.id), subscription);
        this.subscriptionsByReqId.put(str, subscription);
    }

    @Override // io.opencmw.client.DataSource
    public void unsubscribe(String str) {
        this.subscriptionsByReqId.get(str).subscriptionState = SubscriptionState.CANCELED;
    }

    public ConnectionState getConnectionState() {
        return this.connectionState.get();
    }

    public ZContext getContext() {
        return this.context;
    }

    @Override // io.opencmw.client.DataSource
    public ZMQ.Socket getSocket() {
        return this.socket;
    }

    @Override // io.opencmw.client.DataSource
    protected DataSource.Factory getFactory() {
        return FACTORY;
    }

    private String getIdentity() {
        String str;
        try {
            str = InetAddress.getLocalHost().getHostName();
        } catch (UnknownHostException e) {
            str = "localhost";
        }
        long pid = ProcessHandle.current().pid();
        this.connectionId = CONNECTION_ID_GENERATOR.incrementAndGet();
        this.channelId.incrementAndGet();
        String str2 = str;
        return str2 + "/" + pid + "/" + str2 + "/" + this.connectionId;
    }

    private String getSessionId(String str) {
        long pid = ProcessHandle.current().pid();
        return "cmwLightClient{pid=" + pid + ", conn=" + pid + ", clientId=" + this.connectionId + "}";
    }

    public void connect() {
        if (this.connectionState.getAndSet(ConnectionState.CONNECTING) != ConnectionState.DISCONNECTED) {
            return;
        }
        String substring = this.address.startsWith(RDA_3_PROTOCOL) ? this.address.substring(RDA_3_PROTOCOL.length()) : this.address;
        if (!substring.contains(":")) {
            try {
                DirectoryLightClient.Device device = directoryLightClient.getDeviceInfo(Collections.singletonList(substring)).get(0);
                LOGGER.atTrace().addArgument(substring).addArgument(device).log("resolved address for device {}: {}");
                substring = device.servers.stream().findFirst().orElseThrow().get("Address:");
            } catch (DirectoryLightClient.DirectoryClientException | NullPointerException | NoSuchElementException e) {
                LOGGER.atDebug().addArgument(e.getMessage()).log("Error resolving device from nameserver, using address from endpoint. Error was: {}");
                this.backOff *= 2;
                this.connectionState.set(ConnectionState.DISCONNECTED);
                return;
            }
        }
        this.lastHbSent = System.currentTimeMillis();
        try {
            String identity = getIdentity();
            this.connectedAddress = "tcp://" + substring;
            LOGGER.atDebug().addArgument(this.connectedAddress).addArgument(identity).log("connecting to: {} with identity {}");
            this.socket.setIdentity(identity.getBytes());
            this.socket.connect(this.connectedAddress);
            CmwLightProtocol.sendMsg(this.socket, CmwLightMessage.connect(CmwLightProtocol.VERSION));
        } catch (ZMQException | CmwLightProtocol.RdaLightException e2) {
            LOGGER.atDebug().setCause(e2).log("failed to connect: ");
            this.backOff *= 2;
            this.connectionState.set(ConnectionState.DISCONNECTED);
        }
    }

    private void disconnect() {
        LOGGER.atDebug().addArgument(this.connectedAddress).log("disconnecting {}");
        this.connectionState.set(ConnectionState.DISCONNECTED);
        try {
            this.socket.disconnect(this.connectedAddress);
        } catch (ZMQException e) {
            LOGGER.atError().setCause(e).log("Failed to disconnect socket");
        }
        Iterator<Subscription> it = this.subscriptions.values().iterator();
        while (it.hasNext()) {
            it.next().subscriptionState = SubscriptionState.UNSUBSCRIBED;
        }
    }

    @Override // io.opencmw.client.DataSource
    public long housekeeping() {
        long currentTimeMillis = System.currentTimeMillis();
        switch (this.connectionState.get()) {
            case DISCONNECTED:
                if (currentTimeMillis > this.lastHbSent + this.backOff) {
                    LOGGER.atTrace().addArgument(this.address).log("Connecting to {}");
                    connect();
                }
                return this.lastHbSent + this.backOff;
            case CONNECTING:
                if (currentTimeMillis > this.lastHbSent + 3000) {
                    this.backOff *= 2;
                    this.lastHbSent = currentTimeMillis;
                    LOGGER.atTrace().addArgument(this.connectedAddress).addArgument(Integer.valueOf(this.backOff)).log("Connection timed out for {}, retrying in {} ms");
                    disconnect();
                }
                return this.lastHbSent + 3000;
            case CONNECTED:
                break;
            default:
                throw new IllegalStateException("unexpected connection state: " + this.connectionState.get());
        }
        while (true) {
            Request poll = this.queuedRequests.poll();
            if (poll == null) {
                if (currentTimeMillis > this.lastHbSent + SUBSCRIPTION_TIMEOUT) {
                    sendHeartBeat();
                    this.lastHbSent = currentTimeMillis;
                    if (this.lastHbReceived + 3000 < currentTimeMillis) {
                        LOGGER.atDebug().addArgument(Integer.valueOf(this.backOff)).log("Connection timed out, reconnecting in {} ms");
                        disconnect();
                        return SUBSCRIPTION_TIMEOUT;
                    }
                    Iterator<Subscription> it = this.subscriptions.values().iterator();
                    while (it.hasNext()) {
                        updateSubscription(currentTimeMillis, it.next());
                    }
                }
                return this.lastHbSent + SUBSCRIPTION_TIMEOUT;
            }
            this.pendingRequests.put(Long.valueOf(poll.id), poll);
            sendRequest(poll);
        }
    }

    private void sendRequest(Request request) {
        Endpoint endpoint = new Endpoint(request.endpoint);
        try {
            switch (request.requestType) {
                case GET:
                    CmwLightProtocol.sendMsg(this.socket, CmwLightMessage.getRequest(this.sessionId, request.id, endpoint.getDevice(), endpoint.getProperty(), new CmwLightMessage.RequestContext(endpoint.getSelector(), endpoint.getFilters(), null)));
                    break;
                case SET:
                    Objects.requireNonNull(request.data, "Data for set cannot be null");
                    CmwLightProtocol.sendMsg(this.socket, CmwLightMessage.setRequest(this.sessionId, request.id, endpoint.getDevice(), endpoint.getProperty(), new ZFrame(request.data), new CmwLightMessage.RequestContext(endpoint.getSelector(), endpoint.getFilters(), null)));
                    break;
                default:
                    throw new CmwLightProtocol.RdaLightException("Message of unknown type");
            }
        } catch (CmwLightProtocol.RdaLightException e) {
            LOGGER.atDebug().setCause(e).log("Error sending get request:");
        }
    }

    private void updateSubscription(long j, Subscription subscription) {
        switch (subscription.subscriptionState) {
            case SUBSCRIBING:
                if (j > subscription.timeoutValue) {
                    subscription.subscriptionState = SubscriptionState.UNSUBSCRIBED;
                    subscription.timeoutValue = j + subscription.backOff;
                    subscription.backOff *= 2;
                    LOGGER.atDebug().addArgument(subscription.device).addArgument(subscription.property).log("subscription timed out, retrying: {}/{}");
                    return;
                }
                return;
            case UNSUBSCRIBED:
                if (j > subscription.timeoutValue) {
                    LOGGER.atDebug().addArgument(subscription.device).addArgument(subscription.property).log("subscribing {}/{}");
                    sendSubscribe(subscription);
                    return;
                }
                return;
            case SUBSCRIBED:
            case UNSUBSCRIBE_SENT:
                return;
            case CANCELED:
                sendUnsubscribe(subscription);
                return;
            default:
                throw new IllegalStateException("unexpected subscription state: " + subscription.subscriptionState);
        }
    }

    public void sendHeartBeat() {
        try {
            CmwLightProtocol.sendMsg(this.socket, CmwLightMessage.CLIENT_HB);
        } catch (CmwLightProtocol.RdaLightException e) {
            LOGGER.atDebug().setCause(e).log("Error sending heartbeat");
        }
    }

    private void sendSubscribe(Subscription subscription) {
        if (subscription.subscriptionState.equals(SubscriptionState.UNSUBSCRIBED)) {
            try {
                CmwLightProtocol.sendMsg(this.socket, CmwLightMessage.subscribeRequest(this.sessionId, subscription.id, subscription.device, subscription.property, Map.of(CmwLightProtocol.FieldName.SESSION_BODY_TAG.value(), Collections.emptyMap()), new CmwLightMessage.RequestContext(subscription.selector, subscription.filters, null), CmwLightProtocol.UpdateType.IMMEDIATE_UPDATE));
                subscription.subscriptionState = SubscriptionState.SUBSCRIBING;
                subscription.timeoutValue = System.currentTimeMillis() + SUBSCRIPTION_TIMEOUT;
            } catch (CmwLightProtocol.RdaLightException e) {
                LOGGER.atDebug().setCause(e).log("Error subscribing to property:");
                subscription.timeoutValue = System.currentTimeMillis() + subscription.backOff;
                subscription.backOff *= 2;
            }
        }
    }

    private void sendUnsubscribe(Subscription subscription) {
        try {
            CmwLightProtocol.sendMsg(this.socket, CmwLightMessage.unsubscribeRequest(this.sessionId, subscription.updateId, subscription.device, subscription.property, Map.of(CmwLightProtocol.FieldName.SESSION_BODY_TAG.value(), Collections.emptyMap()), CmwLightProtocol.UpdateType.IMMEDIATE_UPDATE));
            subscription.subscriptionState = SubscriptionState.UNSUBSCRIBE_SENT;
        } catch (CmwLightProtocol.RdaLightException e) {
            LOGGER.atError().addArgument(subscription.property).log("failed to unsubscribe ");
        }
    }
}
