package io.opencmw.client.rest;

import io.opencmw.MimeType;
import io.opencmw.OpenCmwProtocol;
import io.opencmw.client.DataSource;
import io.opencmw.serialiser.IoSerialiser;
import io.opencmw.serialiser.spi.JsonSerialiser;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
import okhttp3.Call;
import okhttp3.Callback;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.sse.EventSource;
import okhttp3.sse.EventSourceListener;
import okhttp3.sse.EventSources;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.spi.LoggingEventBuilder;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;

/* loaded from: input_file:io/opencmw/client/rest/RestDataSource.class */
public class RestDataSource extends DataSource implements Runnable {
    public static final DataSource.Factory FACTORY;
    private static final Logger LOGGER;
    private static final int WAIT_TIMEOUT_MILLIS = 1000;
    private static final AtomicInteger REST_DATA_SOURCE_INSTANCE;
    private static final int MAX_RETRIES = 3;
    private static final AtomicLong PUBLICATION_COUNTER;
    protected static OkHttpClient okClient;
    protected static EventSource.Factory eventSourceFactory;
    protected final AtomicBoolean run;
    protected final String uniqueID;
    protected final byte[] uniqueIdBytes;
    protected final String endpoint;
    protected final Duration timeOut;
    protected final String clientID;
    protected int cancelLastCall;
    protected final ZContext ctxCopy;
    protected final Object newData;
    protected final Timer timer;
    protected final List<RestCallBack> pendingCallbacks;
    protected final List<RestCallBack> completedCallbacks;
    protected final BlockingQueue<String> requestQueue;
    protected Map<String, EventSource> sseSource;
    protected ZMQ.Socket internalSocket;
    protected ZMQ.Socket externalSocket;
    protected final TimerTask wakeupTask;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:io/opencmw/client/rest/RestDataSource$RestCallBack.class */
    public class RestCallBack implements Callback {
        private final String hashKey;
        private final String endPointName;
        private final MimeType mimeType;
        private final long requestTimeStamp = System.currentTimeMillis();
        private boolean active = true;
        private final AtomicInteger retryCount = new AtomicInteger();
        private final Lock lock = new ReentrantLock();
        private Response response;
        private Exception exception;

        public RestCallBack(String str, String str2, MimeType mimeType) {
            this.hashKey = str;
            this.endPointName = str2;
            this.mimeType = mimeType;
        }

        public String toString() {
            String str = this.hashKey;
            String str2 = this.endPointName;
            long j = this.requestTimeStamp;
            boolean z = this.active;
            AtomicInteger atomicInteger = this.retryCount;
            Response response = this.response;
            Exception exc = this.exception;
            return "RestCallBack{hashKey='" + str + "', endPointName='" + str2 + "', requestTimeStamp=" + j + ", active=" + str + ", retryCount=" + z + ", result=" + atomicInteger + ", exception=" + response + "}";
        }

        public void checkTimeOut() {
            if (!this.active || RestDataSource.this.timeOut.toMillis() <= 0) {
                return;
            }
            long currentTimeMillis = System.currentTimeMillis();
            if (this.requestTimeStamp + RestDataSource.this.timeOut.toMillis() < currentTimeMillis) {
                this.lock.lock();
                String str = RestDataSource.this.endpoint;
                this.exception = new TimeoutException("ts=" + currentTimeMillis + " - time-out of REST request for endpoint: " + this);
                notifyResult();
                this.lock.unlock();
            }
        }

        public void onFailure(@NotNull Call call, @NotNull IOException iOException) {
            if (this.active) {
                if (this.retryCount.incrementAndGet() > 3) {
                    RestDataSource.LOGGER.atWarn().setCause(iOException).addArgument(3).addArgument(RestDataSource.this.endpoint).log("failed after {} connect/receive retries - abort");
                    this.lock.lock();
                    this.exception = iOException;
                    notifyResult();
                    this.lock.unlock();
                    RestDataSource.LOGGER.atWarn().addArgument(iOException.getLocalizedMessage()).log("RestCallBack-Failure: '{}'");
                    return;
                }
                this.lock.lock();
                this.exception = iOException;
                this.lock.unlock();
                LoggingEventBuilder atWarn = RestDataSource.LOGGER.atWarn();
                if (RestDataSource.LOGGER.isTraceEnabled()) {
                    atWarn.setCause(iOException);
                }
                atWarn.addArgument(Integer.valueOf(this.retryCount.get())).addArgument(3).addArgument(RestDataSource.this.endpoint).log("retry {} of {}: could not connect/receive from endpoint {}");
                LockSupport.parkNanos(RestDataSource.this.timeOut.toMillis() * (1 << (2 * (this.retryCount.get() - 1))));
                Call newCall = RestDataSource.okClient.newCall(new Request.Builder().url(this.endPointName).get().addHeader("Accept", this.mimeType.toString()).build());
                newCall.enqueue(this);
                if (RestDataSource.this.cancelLastCall > 0) {
                    newCall.cancel();
                    RestDataSource.this.cancelLastCall--;
                }
            }
        }

        public void onResponse(@NotNull Call call, @NotNull Response response) {
            if (this.active) {
                this.lock.lock();
                this.response = response;
                notifyResult();
                this.lock.unlock();
                if (RestDataSource.LOGGER.isTraceEnabled()) {
                    RestDataSource.LOGGER.atTrace().addArgument(response).log("RestCallBack: '{}'");
                }
            }
        }

        private void notifyResult() {
            synchronized (RestDataSource.this.newData) {
                this.active = false;
                RestDataSource.this.pendingCallbacks.remove(this);
                RestDataSource.this.completedCallbacks.add(this);
                RestDataSource.this.newData.notifyAll();
            }
        }
    }

    protected RestDataSource(ZContext zContext, String str) {
        this(zContext, str, Duration.ofMillis(0L), RestDataSource.class.getName());
    }

    public RestDataSource(ZContext zContext, String str, Duration duration, String str2) {
        super(str);
        this.run = new AtomicBoolean(true);
        this.newData = new Object();
        this.timer = new Timer();
        this.pendingCallbacks = Collections.synchronizedList(new ArrayList());
        this.completedCallbacks = Collections.synchronizedList(new ArrayList());
        this.requestQueue = new LinkedBlockingDeque();
        this.sseSource = new HashMap();
        this.wakeupTask = new TimerTask() { // from class: io.opencmw.client.rest.RestDataSource.2
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                synchronized (RestDataSource.this.newData) {
                    RestDataSource.this.newData.notifyAll();
                }
            }
        };
        synchronized (LOGGER) {
            if (okClient == null) {
                okClient = new OkHttpClient();
                eventSourceFactory = EventSources.createFactory(okClient);
            }
        }
        if (duration == null) {
            throw new IllegalArgumentException("timeOut is null");
        }
        this.ctxCopy = zContext == null ? new ZContext() : zContext;
        this.endpoint = str;
        this.timeOut = duration;
        this.clientID = str2;
        this.uniqueID = str2 + "PID=" + ManagementFactory.getRuntimeMXBean().getName() + "-InstanceID=" + REST_DATA_SOURCE_INSTANCE.getAndIncrement();
        this.uniqueIdBytes = this.uniqueID.getBytes(ZMQ.CHARSET);
        if (duration.toMillis() > 0) {
            this.timer.scheduleAtFixedRate(this.wakeupTask, 0L, duration.toMillis());
        }
        start();
    }

    private void createPair() {
        if (this.internalSocket != null) {
            this.internalSocket.close();
        }
        if (this.externalSocket != null) {
            this.externalSocket.close();
        }
        this.internalSocket = this.ctxCopy.createSocket(SocketType.PAIR);
        if (!$assertionsDisabled && this.internalSocket == null) {
            throw new AssertionError("internalSocket being initialised");
        }
        if (!this.internalSocket.setHWM(0)) {
            throw new IllegalStateException("could not set HWM on internalSocket");
        }
        if (!this.internalSocket.setIdentity(this.uniqueIdBytes)) {
            throw new IllegalStateException("could not set identity on internalSocket");
        }
        if (!this.internalSocket.bind("inproc://" + this.uniqueID)) {
            throw new IllegalStateException("could not bind internalSocket to: inproc://" + this.uniqueID);
        }
        this.externalSocket = this.ctxCopy.createSocket(SocketType.PAIR);
        if (!$assertionsDisabled && this.externalSocket == null) {
            throw new AssertionError("externalSocket being initialised");
        }
        if (!this.externalSocket.setHWM(0)) {
            throw new IllegalStateException("could not set HWM on externalSocket");
        }
        if (!this.externalSocket.connect("inproc://" + this.uniqueID)) {
            throw new IllegalStateException("could not bind externalSocket to: inproc://" + this.uniqueID);
        }
        LOGGER.atTrace().addArgument(this.endpoint).log("(re-)connecting to REST endpoint: '{}'");
    }

    @Override // io.opencmw.client.DataSource
    public void get(String str, String str2, byte[] bArr, byte[] bArr2, byte[] bArr3) {
        enqueueRequest(str);
    }

    @Override // io.opencmw.client.DataSource
    public void set(String str, String str2, byte[] bArr, byte[] bArr2, byte[] bArr3) {
        throw new UnsupportedOperationException("set not (yet) implemented");
    }

    public void enqueueRequest(String str) {
        if (!this.requestQueue.offer(str)) {
            throw new IllegalStateException("could not add hashKey " + str + " to request queue of endpoint " + this.endpoint);
        }
        synchronized (this.newData) {
            this.newData.notifyAll();
        }
    }

    @Override // io.opencmw.client.DataSource
    public void subscribe(String str, final String str2, byte[] bArr) {
        this.sseSource.put(str, eventSourceFactory.newEventSource(new Request.Builder().url(str2).build(), new EventSourceListener() { // from class: io.opencmw.client.rest.RestDataSource.3
            public void onEvent(@NotNull EventSource eventSource, String str3, String str4, @NotNull String str5) {
                RestDataSource.this.getRequest(RestDataSource.this.clientID + "#" + RestDataSource.PUBLICATION_COUNTER.getAndIncrement(), str2, MimeType.TEXT);
            }
        }));
    }

    @Override // io.opencmw.client.DataSource
    public void unsubscribe(String str) {
        EventSource remove = this.sseSource.remove(str);
        if (remove != null) {
            remove.cancel();
        }
    }

    public ZContext getCtx() {
        return this.ctxCopy;
    }

    @Override // io.opencmw.client.DataSource
    public ZMQ.Socket getSocket() {
        return this.externalSocket;
    }

    @Override // io.opencmw.client.DataSource
    protected DataSource.Factory getFactory() {
        return FACTORY;
    }

    @Override // io.opencmw.client.DataSource
    public ZMsg getMessage() {
        return ZMsg.recvMsg(this.externalSocket, false);
    }

    @Override // io.opencmw.client.DataSource
    public long housekeeping() {
        synchronized (this.newData) {
            Iterator it = new ArrayList(this.pendingCallbacks).iterator();
            while (it.hasNext()) {
                ((RestCallBack) it.next()).checkTimeOut();
            }
            while (!this.requestQueue.isEmpty()) {
                try {
                    String take = this.requestQueue.take();
                    if (LOGGER.isTraceEnabled()) {
                        LOGGER.atTrace().addArgument(take).log("external request with hashKey = '{}'");
                    }
                    getRequest(take, this.endpoint, MimeType.TEXT);
                } catch (InterruptedException e) {
                    LOGGER.atError().setCause(e).addArgument(this.endpoint).log("error in retrieving requestQueue items for endpoint: {}");
                }
            }
        }
        return System.currentTimeMillis() + this.timeOut.toMillis();
    }

    @Override // java.lang.Runnable
    public void run() {
        byte[] bytes;
        byte[] bytes2;
        this.run.set(true);
        while (this.run.get() && !Thread.interrupted()) {
            try {
                try {
                    synchronized (this.newData) {
                        if (this.completedCallbacks.isEmpty() && this.requestQueue.isEmpty()) {
                            this.newData.wait(this.timeOut.toMillis() <= 0 ? TimeUnit.MILLISECONDS.toMillis(1000L) : this.timeOut.toMillis());
                        }
                        for (RestCallBack restCallBack : this.completedCallbacks) {
                            if (restCallBack.response == null) {
                                bytes = OpenCmwProtocol.EMPTY_FRAME;
                                bytes2 = OpenCmwProtocol.EMPTY_FRAME;
                            } else {
                                bytes = restCallBack.response.headers().toString().getBytes(StandardCharsets.UTF_8);
                                bytes2 = restCallBack.response.peekBody(Long.MAX_VALUE).bytes();
                                restCallBack.response.close();
                            }
                            byte[] bytes3 = restCallBack.exception == null ? OpenCmwProtocol.EMPTY_FRAME : restCallBack.exception.getMessage().getBytes(StandardCharsets.UTF_8);
                            ZMsg zMsg = new ZMsg();
                            zMsg.add(restCallBack.hashKey);
                            zMsg.add(restCallBack.endPointName);
                            zMsg.add(bytes);
                            zMsg.add(bytes2);
                            zMsg.add(bytes3);
                            if (!zMsg.send(this.internalSocket)) {
                                throw new IllegalStateException("internalSocket could not send message - error code: " + this.internalSocket.errno());
                            }
                        }
                        this.completedCallbacks.clear();
                        housekeeping();
                    }
                } catch (Exception e) {
                    LOGGER.atError().setCause(e).log("data acquisition loop abnormally terminated");
                    this.externalSocket.close();
                    this.internalSocket.close();
                }
            } catch (Throwable th) {
                this.externalSocket.close();
                this.internalSocket.close();
                throw th;
            }
        }
        this.externalSocket.close();
        this.internalSocket.close();
        LOGGER.atTrace().addArgument(this.uniqueID).addArgument(Boolean.valueOf(this.run.get())).log("stop poller thread for uniqueID={} - run={}");
    }

    public void start() {
        createPair();
        new Thread(this).start();
    }

    public void stop() {
        Iterator<String> it = this.sseSource.keySet().iterator();
        while (it.hasNext()) {
            unsubscribe(it.next());
        }
        this.run.set(false);
    }

    protected void getRequest(String str, String str2, MimeType mimeType) {
        Request build = new Request.Builder().url(str2).get().addHeader("Accept", mimeType.toString()).build();
        if (LOGGER.isTraceEnabled()) {
            LOGGER.atTrace().addArgument(this.endpoint).addArgument(str2).addArgument(build).log("new request for {} - {} : request{}");
        }
        RestCallBack restCallBack = new RestCallBack(str, str2, mimeType);
        this.pendingCallbacks.add(restCallBack);
        Call newCall = okClient.newCall(build);
        newCall.enqueue(restCallBack);
        if (this.cancelLastCall > 0) {
            newCall.cancel();
            this.cancelLastCall--;
        }
    }

    static {
        $assertionsDisabled = !RestDataSource.class.desiredAssertionStatus();
        FACTORY = new DataSource.Factory() { // from class: io.opencmw.client.rest.RestDataSource.1
            @Override // io.opencmw.client.DataSource.Factory
            public boolean matches(String str) {
                return (str == null || str.isBlank() || !str.toLowerCase(Locale.UK).startsWith("http")) ? false : true;
            }

            @Override // io.opencmw.client.DataSource.Factory
            public Class<? extends IoSerialiser> getMatchingSerialiserType(String str) {
                return JsonSerialiser.class;
            }

            @Override // io.opencmw.client.DataSource.Factory
            public DataSource newInstance(ZContext zContext, String str, Duration duration, String str2) {
                return new RestDataSource(zContext, str, duration, str2);
            }
        };
        LOGGER = LoggerFactory.getLogger(RestDataSource.class);
        REST_DATA_SOURCE_INSTANCE = new AtomicInteger();
        PUBLICATION_COUNTER = new AtomicLong();
    }
}
