package io.opencmw.client;

import com.lmax.disruptor.EventHandler;
import io.opencmw.EventStore;
import io.opencmw.RingBufferEvent;
import io.opencmw.client.DataSourceFilter;
import io.opencmw.filter.EvtTypeFilter;
import io.opencmw.filter.TimingCtx;
import io.opencmw.rbac.RbacProvider;
import io.opencmw.serialiser.IoBuffer;
import io.opencmw.serialiser.IoClassSerialiser;
import io.opencmw.serialiser.IoSerialiser;
import io.opencmw.serialiser.spi.FastByteBuffer;
import io.opencmw.utils.CustomFuture;
import io.opencmw.utils.SharedPointer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;

/* loaded from: input_file:io/opencmw/client/DataSourcePublisher.class */
public class DataSourcePublisher implements Runnable {
    private static final Logger LOGGER;
    private static final AtomicInteger INSTANCE_COUNT;
    private static final byte[] EMPTY_BYTE_ARRAY;
    private static final ZFrame EMPTY_FRAME;
    public static final int MIN_FRAMES_INTERNAL_MSG = 3;
    private final String inprocCtrl;
    protected final Map<String, ThePromisedFuture<?>> requestFutureMap;
    protected final Map<String, DataSource> clientMap;
    private final AtomicBoolean running;
    private final AtomicInteger internalReqIdGenerator;
    private final EventStore eventStore;
    private final ZMQ.Poller poller;
    private final ZContext context;
    private final ZMQ.Socket controlSocket;
    private final IoBuffer byteBuffer;
    private final IoClassSerialiser ioClassSerialiser;
    private final ThreadLocal<ZMQ.Socket> perThreadControlSocket;
    private final String clientId;
    private final RbacProvider rbacProvider;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: io.opencmw.client.DataSourcePublisher$2, reason: invalid class name */
    /* loaded from: input_file:io/opencmw/client/DataSourcePublisher$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$io$opencmw$client$DataSourceFilter$ReplyType = new int[DataSourceFilter.ReplyType.values().length];

        static {
            try {
                $SwitchMap$io$opencmw$client$DataSourceFilter$ReplyType[DataSourceFilter.ReplyType.SUBSCRIBE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$io$opencmw$client$DataSourceFilter$ReplyType[DataSourceFilter.ReplyType.GET.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$io$opencmw$client$DataSourceFilter$ReplyType[DataSourceFilter.ReplyType.SET.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$io$opencmw$client$DataSourceFilter$ReplyType[DataSourceFilter.ReplyType.UNSUBSCRIBE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$io$opencmw$client$DataSourceFilter$ReplyType[DataSourceFilter.ReplyType.UNKNOWN.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    /* loaded from: input_file:io/opencmw/client/DataSourcePublisher$ThePromisedFuture.class */
    public static class ThePromisedFuture<R> extends CustomFuture<R> {
        private final String endpoint;
        private final Map<String, Object> requestFilter;
        private final Object requestBody;
        private final Class<R> requestedDomainObjType;
        private final DataSourceFilter.ReplyType replyType;
        private final String internalRequestID;

        public ThePromisedFuture(String str, Map<String, Object> map, Object obj, Class<R> cls, DataSourceFilter.ReplyType replyType, String str2) {
            this.endpoint = str;
            this.requestFilter = map;
            this.requestBody = obj;
            this.requestedDomainObjType = cls;
            this.replyType = replyType;
            this.internalRequestID = str2;
        }

        public String getEndpoint() {
            return this.endpoint;
        }

        public DataSourceFilter.ReplyType getReplyType() {
            return this.replyType;
        }

        public Object getRequestBody() {
            return this.requestBody;
        }

        public Map<String, Object> getRequestFilter() {
            return this.requestFilter;
        }

        public Class<R> getRequestedDomainObjType() {
            return this.requestedDomainObjType;
        }

        protected void castAndSetReply(Object obj) {
            setReply(obj);
        }

        public String getInternalRequestID() {
            return this.internalRequestID;
        }
    }

    public DataSourcePublisher(RbacProvider rbacProvider, EventStore eventStore, String... strArr) {
        this(rbacProvider, strArr);
        this.eventStore.register(new EventHandler[]{(ringBufferEvent, j, z) -> {
            DataSourceFilter dataSourceFilter = (DataSourceFilter) ringBufferEvent.getFilter(DataSourceFilter.class);
            ThePromisedFuture<?> thePromisedFuture = dataSourceFilter.future;
            if (((ThePromisedFuture) thePromisedFuture).replyType == DataSourceFilter.ReplyType.SUBSCRIBE) {
                Class<?> requestedDomainObjType = thePromisedFuture.getRequestedDomainObjType();
                ZMsg zMsg = (ZMsg) ringBufferEvent.payload.get(ZMsg.class);
                ((ZFrame) Objects.requireNonNull(zMsg.poll())).getString(Charset.defaultCharset());
                byte[] data = ((ZFrame) Objects.requireNonNull(zMsg.poll())).getData();
                String string = ((ZFrame) Objects.requireNonNull(zMsg.poll())).getString(Charset.defaultCharset());
                Object obj = null;
                if (data != null && data.length != 0) {
                    this.ioClassSerialiser.setDataBuffer(FastByteBuffer.wrap(data));
                    obj = this.ioClassSerialiser.deserialiseObject(requestedDomainObjType);
                    this.ioClassSerialiser.setDataBuffer(this.byteBuffer);
                }
                eventStore.getRingBuffer().publishEvent((ringBufferEvent, j, obj2) -> {
                    TimingCtx filter = ringBufferEvent.getFilter(TimingCtx.class);
                    EvtTypeFilter filter2 = ringBufferEvent.getFilter(EvtTypeFilter.class);
                    ringBufferEvent.arrivalTimeStamp = ringBufferEvent.arrivalTimeStamp;
                    ringBufferEvent.payload = new SharedPointer();
                    ringBufferEvent.payload.set(obj2);
                    if (string != null && !string.isBlank()) {
                        ringBufferEvent.throwables.add(new Exception(string));
                    }
                    try {
                        filter.setSelector(dataSourceFilter.context, 0L);
                    } catch (IllegalArgumentException e) {
                        LOGGER.atError().setCause(e).addArgument(dataSourceFilter.context).log("No valid context: {}");
                    }
                    filter2.evtType = EvtTypeFilter.DataType.DEVICE_DATA;
                    filter2.typeName = dataSourceFilter.device + "/" + dataSourceFilter.property;
                    filter2.updateType = EvtTypeFilter.UpdateType.COMPLETE;
                }, obj);
                return;
            }
            if (((ThePromisedFuture) thePromisedFuture).replyType != DataSourceFilter.ReplyType.GET) {
                LOGGER.atInfo().addArgument(ringBufferEvent.payload.get()).log("{}");
                return;
            }
            ZMsg zMsg2 = (ZMsg) ringBufferEvent.payload.get(ZMsg.class);
            ((ZFrame) Objects.requireNonNull(zMsg2.poll())).getString(StandardCharsets.UTF_8);
            byte[] data2 = ((ZFrame) Objects.requireNonNull(zMsg2.poll())).getData();
            String string2 = ((ZFrame) Objects.requireNonNull(zMsg2.poll())).getString(Charset.defaultCharset());
            Object obj3 = null;
            if (data2 != null && data2.length != 0) {
                this.ioClassSerialiser.setDataBuffer(FastByteBuffer.wrap(data2));
                obj3 = this.ioClassSerialiser.deserialiseObject(thePromisedFuture.getRequestedDomainObjType());
                this.ioClassSerialiser.setDataBuffer(this.byteBuffer);
            }
            if (string2 == null || string2.isBlank()) {
                thePromisedFuture.castAndSetReply(obj3);
            } else {
                thePromisedFuture.setException(new Exception(string2));
            }
            eventStore.getRingBuffer().publishEvent((ringBufferEvent2, j2, obj4) -> {
                TimingCtx filter = ringBufferEvent2.getFilter(TimingCtx.class);
                EvtTypeFilter filter2 = ringBufferEvent2.getFilter(EvtTypeFilter.class);
                ringBufferEvent2.arrivalTimeStamp = ringBufferEvent.arrivalTimeStamp;
                ringBufferEvent2.payload = new SharedPointer();
                ringBufferEvent2.payload.set(obj4);
                if (string2 != null && !string2.isBlank()) {
                    ringBufferEvent2.throwables.add(new Exception(string2));
                }
                try {
                    filter.setSelector(dataSourceFilter.context, 0L);
                } catch (IllegalArgumentException e) {
                    LOGGER.atError().setCause(e).addArgument(dataSourceFilter.context).log("No valid context: {}");
                }
                filter2.evtType = EvtTypeFilter.DataType.DEVICE_DATA;
                filter2.typeName = dataSourceFilter.device + "/" + dataSourceFilter.property;
                filter2.updateType = EvtTypeFilter.UpdateType.COMPLETE;
            }, obj3);
        }});
    }

    public DataSourcePublisher(RbacProvider rbacProvider, EventHandler<RingBufferEvent> eventHandler, String... strArr) {
        this(rbacProvider, strArr);
        this.eventStore.register(new EventHandler[]{eventHandler});
    }

    public DataSourcePublisher(RbacProvider rbacProvider, String... strArr) {
        this.inprocCtrl = "inproc://dsPublisher#" + INSTANCE_COUNT.incrementAndGet();
        this.requestFutureMap = new ConcurrentHashMap();
        this.clientMap = new ConcurrentHashMap();
        this.running = new AtomicBoolean(false);
        this.internalReqIdGenerator = new AtomicInteger(0);
        this.context = new ZContext(1);
        this.byteBuffer = new FastByteBuffer(2000);
        this.ioClassSerialiser = new IoClassSerialiser(this.byteBuffer, new Class[0]);
        this.perThreadControlSocket = new ThreadLocal<ZMQ.Socket>() { // from class: io.opencmw.client.DataSourcePublisher.1
            private ZMQ.Socket result;

            @Override // java.lang.ThreadLocal
            public void remove() {
                if (this.result != null) {
                    this.result.disconnect(DataSourcePublisher.this.inprocCtrl);
                }
                super.remove();
            }

            /* JADX INFO: Access modifiers changed from: protected */
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.lang.ThreadLocal
            public ZMQ.Socket initialValue() {
                this.result = DataSourcePublisher.this.context.createSocket(SocketType.DEALER);
                this.result.connect(DataSourcePublisher.this.inprocCtrl);
                return this.result;
            }
        };
        this.poller = this.context.createPoller(1);
        this.controlSocket = this.context.createSocket(SocketType.DEALER);
        this.controlSocket.bind(this.inprocCtrl);
        this.poller.register(this.controlSocket, 1);
        this.eventStore = EventStore.getFactory().setSingleProducer(true).setFilterConfig(new Class[]{DataSourceFilter.class}).build();
        this.clientId = strArr.length == 1 ? strArr[0] : DataSourcePublisher.class.getName();
        this.rbacProvider = rbacProvider;
    }

    public ZContext getContext() {
        return this.context;
    }

    public EventStore getEventStore() {
        return this.eventStore;
    }

    public <R> Future<R> set(String str, Class<R> cls, Object obj, RbacProvider... rbacProviderArr) {
        return set(str, null, obj, cls, rbacProviderArr);
    }

    public <R> Future<R> set(String str, Map<String, Object> map, Object obj, Class<R> cls, RbacProvider... rbacProviderArr) {
        return (Future<R>) request(DataSourceFilter.ReplyType.SET, str, map, obj, cls, rbacProviderArr);
    }

    public <R> Future<R> get(String str, Class<R> cls, RbacProvider... rbacProviderArr) {
        return get(str, null, null, cls, rbacProviderArr);
    }

    public <R> Future<R> get(String str, Map<String, Object> map, Object obj, Class<R> cls, RbacProvider... rbacProviderArr) {
        return (Future<R>) request(DataSourceFilter.ReplyType.GET, str, map, obj, cls, rbacProviderArr);
    }

    private <R> ThePromisedFuture<R> request(DataSourceFilter.ReplyType replyType, String str, Map<String, Object> map, Object obj, Class<R> cls, RbacProvider... rbacProviderArr) {
        String str2 = this.clientId + this.internalReqIdGenerator.incrementAndGet();
        ThePromisedFuture<R> newFuture = newFuture(str, map, obj, cls, replyType, str2);
        Class<? extends IoSerialiser> matchingSerialiserType = DataSource.getFactory(str).getMatchingSerialiserType(str);
        ZMsg zMsg = new ZMsg();
        zMsg.add(new byte[]{replyType.getID()});
        zMsg.add(str2);
        zMsg.add(str);
        if (map == null) {
            zMsg.add(EMPTY_FRAME);
        } else {
            this.ioClassSerialiser.getDataBuffer().reset();
            this.ioClassSerialiser.setMatchedIoSerialiser(matchingSerialiserType);
            this.ioClassSerialiser.serialiseObject(map);
            zMsg.add(Arrays.copyOfRange(this.ioClassSerialiser.getDataBuffer().elements(), 0, this.ioClassSerialiser.getDataBuffer().position()));
        }
        if (obj == null) {
            zMsg.add(EMPTY_FRAME);
        } else {
            this.ioClassSerialiser.getDataBuffer().reset();
            this.ioClassSerialiser.setMatchedIoSerialiser(matchingSerialiserType);
            this.ioClassSerialiser.serialiseObject(obj);
            zMsg.add(Arrays.copyOfRange(this.ioClassSerialiser.getDataBuffer().elements(), 0, this.ioClassSerialiser.getDataBuffer().position()));
        }
        if (rbacProviderArr.length > 0 || this.rbacProvider != null) {
            RbacProvider rbacProvider = rbacProviderArr.length > 0 ? rbacProviderArr[0] : this.rbacProvider;
        } else {
            zMsg.add(EMPTY_FRAME);
        }
        zMsg.send(this.perThreadControlSocket.get());
        this.perThreadControlSocket.remove();
        return newFuture;
    }

    public <T> void subscribe(String str, Class<T> cls) {
        subscribe(str, cls, null, null, new RbacProvider[0]);
    }

    public <R> String subscribe(String str, Class<R> cls, Map<String, Object> map, Object obj, RbacProvider... rbacProviderArr) {
        return ((ThePromisedFuture) request(DataSourceFilter.ReplyType.SUBSCRIBE, str, map, obj, cls, rbacProviderArr)).internalRequestID;
    }

    public void unsubscribe(String str) {
        ZMsg zMsg = new ZMsg();
        zMsg.add(new byte[]{DataSourceFilter.ReplyType.UNSUBSCRIBE.getID()});
        zMsg.add(str);
        zMsg.add(((ThePromisedFuture) this.requestFutureMap.get(str)).endpoint);
        zMsg.send(this.perThreadControlSocket.get());
        this.perThreadControlSocket.remove();
    }

    @Override // java.lang.Runnable
    public void run() {
        this.eventStore.start();
        this.running.set(true);
        long currentTimeMillis = System.currentTimeMillis();
        long j = 0;
        while (true) {
            long j2 = j;
            if (Thread.interrupted() || !this.running.get() || (j2 > 0 && -1 == this.poller.poll(j2))) {
                break;
            }
            boolean z = true;
            while (z && System.currentTimeMillis() < currentTimeMillis && this.running.get()) {
                z = handleDataSourceSockets() | handleControlSocket();
            }
            currentTimeMillis = this.clientMap.values().stream().mapToLong((v0) -> {
                return v0.housekeeping();
            }).min().orElse(System.currentTimeMillis() + 1000);
            j = currentTimeMillis - System.currentTimeMillis();
        }
        LOGGER.atDebug().addArgument(this.clientMap.values()).log("poller returned negative value - abort run() - clients = {}");
    }

    public void start() {
        new Thread(this).start();
    }

    protected boolean handleControlSocket() {
        ZMsg recvMsg = ZMsg.recvMsg(this.controlSocket, false);
        if (recvMsg == null) {
            return false;
        }
        if (recvMsg.size() < 3) {
            LOGGER.atDebug().log("ignoring invalid message");
            return true;
        }
        DataSourceFilter.ReplyType valueOf = DataSourceFilter.ReplyType.valueOf(recvMsg.pollFirst().getData()[0]);
        String string = ((ZFrame) Objects.requireNonNull(recvMsg.pollFirst())).getString(Charset.defaultCharset());
        String string2 = ((ZFrame) Objects.requireNonNull(recvMsg.pollFirst())).getString(Charset.defaultCharset());
        byte[] data = recvMsg.isEmpty() ? EMPTY_BYTE_ARRAY : recvMsg.pollFirst().getData();
        byte[] data2 = recvMsg.isEmpty() ? EMPTY_BYTE_ARRAY : recvMsg.pollFirst().getData();
        byte[] data3 = recvMsg.isEmpty() ? EMPTY_BYTE_ARRAY : recvMsg.pollFirst().getData();
        DataSource client = getClient(string2);
        switch (AnonymousClass2.$SwitchMap$io$opencmw$client$DataSourceFilter$ReplyType[valueOf.ordinal()]) {
            case 1:
                client.subscribe(string, string2, data3);
                return true;
            case 2:
                client.get(string, string2, data, data2, data3);
                return true;
            case MIN_FRAMES_INTERNAL_MSG /* 3 */:
                client.set(string, string2, data, data2, data3);
                return true;
            case 4:
                client.unsubscribe(string);
                this.requestFutureMap.remove(string);
                return true;
            case 5:
            default:
                throw new UnsupportedOperationException("Illegal operation type");
        }
    }

    protected boolean handleDataSourceSockets() {
        boolean z = false;
        Iterator<DataSource> it = this.clientMap.values().iterator();
        while (it.hasNext()) {
            ZMsg message = it.next().getMessage();
            if (message != null) {
                z = true;
                if (!message.isEmpty()) {
                    this.eventStore.getRingBuffer().publishEvent((ringBufferEvent, j) -> {
                        String string = ((ZFrame) Objects.requireNonNull(message.pollFirst())).getString(Charset.defaultCharset());
                        ThePromisedFuture<?> thePromisedFuture = this.requestFutureMap.get(string);
                        if (thePromisedFuture.getReplyType() != DataSourceFilter.ReplyType.SUBSCRIBE) {
                            if (!$assertionsDisabled && !thePromisedFuture.getInternalRequestID().equals(string)) {
                                throw new AssertionError("requestID mismatch");
                            }
                            this.requestFutureMap.remove(string);
                        }
                        Endpoint endpoint = new Endpoint(((ZFrame) Objects.requireNonNull(message.pollFirst())).getString(Charset.defaultCharset()));
                        ringBufferEvent.arrivalTimeStamp = System.currentTimeMillis();
                        ringBufferEvent.payload = new SharedPointer();
                        ringBufferEvent.payload.set(message);
                        DataSourceFilter dataSourceFilter = (DataSourceFilter) ringBufferEvent.getFilter(DataSourceFilter.class);
                        dataSourceFilter.future = thePromisedFuture;
                        dataSourceFilter.eventType = DataSourceFilter.ReplyType.SUBSCRIBE;
                        dataSourceFilter.device = endpoint.getDevice();
                        dataSourceFilter.property = endpoint.getProperty();
                        dataSourceFilter.context = endpoint.getSelector();
                    });
                }
            }
        }
        return z;
    }

    protected <R> ThePromisedFuture<R> newFuture(String str, Map<String, Object> map, Object obj, Class<R> cls, DataSourceFilter.ReplyType replyType, String str2) {
        ThePromisedFuture<R> thePromisedFuture = new ThePromisedFuture<>(str, map, obj, cls, replyType, str2);
        ThePromisedFuture<?> put = this.requestFutureMap.put(str2, thePromisedFuture);
        if ($assertionsDisabled || put == null) {
            return thePromisedFuture;
        }
        throw new AssertionError("requestID '" + str2 + "' already present in requestFutureMap");
    }

    private DataSource getClient(String str) {
        return this.clientMap.computeIfAbsent(new Endpoint(str).getAddress(), str2 -> {
            DataSource newInstance = DataSource.getFactory(str2).newInstance(this.context, str, Duration.ofMillis(100L), Long.toString(this.internalReqIdGenerator.incrementAndGet()));
            this.poller.register(newInstance.getSocket(), 1);
            return newInstance;
        });
    }

    static {
        $assertionsDisabled = !DataSourcePublisher.class.desiredAssertionStatus();
        LOGGER = LoggerFactory.getLogger(DataSourcePublisher.class);
        INSTANCE_COUNT = new AtomicInteger();
        EMPTY_BYTE_ARRAY = new byte[0];
        EMPTY_FRAME = new ZFrame(EMPTY_BYTE_ARRAY);
    }
}
