package io.thundra.merloc.broker.client.impl;

import io.thundra.merloc.broker.client.BrokerClient;
import io.thundra.merloc.broker.client.BrokerConstants;
import io.thundra.merloc.broker.client.BrokerCredentials;
import io.thundra.merloc.broker.client.BrokerEnvelope;
import io.thundra.merloc.broker.client.BrokerMessage;
import io.thundra.merloc.broker.client.BrokerMessageCallback;
import io.thundra.merloc.broker.client.BrokerPayload;
import io.thundra.merloc.common.logger.StdLogger;
import io.thundra.merloc.common.utils.ExceptionUtils;
import io.thundra.merloc.common.utils.ExecutorUtils;
import io.thundra.merloc.common.utils.StringUtils;
import io.thundra.merloc.thirdparty.com.fasterxml.jackson.databind.DeserializationFeature;
import io.thundra.merloc.thirdparty.com.fasterxml.jackson.databind.ObjectMapper;
import io.thundra.merloc.thirdparty.com.fasterxml.jackson.databind.SerializationFeature;
import io.thundra.merloc.thirdparty.okhttp3.Dispatcher;
import io.thundra.merloc.thirdparty.okhttp3.OkHttpClient;
import io.thundra.merloc.thirdparty.okhttp3.Request;
import io.thundra.merloc.thirdparty.okhttp3.Response;
import io.thundra.merloc.thirdparty.okhttp3.WebSocket;
import io.thundra.merloc.thirdparty.okhttp3.WebSocketListener;
import io.thundra.merloc.thirdparty.okio.ByteString;
import java.io.IOException;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/* loaded from: input_file:io/thundra/merloc/broker/client/impl/OkHttpWebSocketBrokerClient.class */
public final class OkHttpWebSocketBrokerClient extends WebSocketListener implements BrokerClient {
    private static final String API_KEY_HEADER_NAME = "x-api-key";
    private static final int MAX_FRAME_SIZE = 16384;
    private static final int NORMAL_CLOSE_CODE = 1000;
    private static final OkHttpClient baseClient = new OkHttpClient.Builder().dispatcher(new Dispatcher(ExecutorUtils.newCachedExecutorService("broker-client-okhttp-dispatcher", false))).readTimeout(3, TimeUnit.SECONDS).pingInterval(30, TimeUnit.SECONDS).build();
    private final ObjectMapper objectMapper;
    private final ExecutorService messageHandlerExecutorService;
    private final ScheduledExecutorService inFlightMessageCleanerExecutorService;
    private final ScheduledExecutorService idleEnvelopeCleanerExecutorService;
    private final Map<String, InFlightMessage> messageMap;
    private final EnvelopeGlue envelopeGlue;
    private final OkHttpClient client;
    private final WebSocket webSocket;
    private final BrokerMessageCallback messageCallback;
    private final CompletableFuture<Boolean> connectedFuture;
    private final CompletableFuture<Boolean> closedFuture;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/thundra/merloc/broker/client/impl/OkHttpWebSocketBrokerClient$BrokerEnvelopeKey.class */
    public static class BrokerEnvelopeKey {
        private final String id;
        private final long initTime;

        private BrokerEnvelopeKey(String str, long j) {
            this.id = str;
            this.initTime = j;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            return Objects.equals(this.id, ((BrokerEnvelopeKey) obj).id);
        }

        public int hashCode() {
            return Objects.hash(this.id);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/thundra/merloc/broker/client/impl/OkHttpWebSocketBrokerClient$EnvelopeGlue.class */
    public class EnvelopeGlue {
        private final long ENVELOPE_IDLE_TIMEOUT;
        private final Map<BrokerEnvelopeKey, Set<BrokerEnvelope>> envelopeMap;

        private EnvelopeGlue() {
            this.ENVELOPE_IDLE_TIMEOUT = TimeUnit.MINUTES.toMillis(1L);
            this.envelopeMap = new ConcurrentHashMap();
        }

        private void glueSiblingEnvelopesAndHandleMessage(Set<BrokerEnvelope> set) {
            BrokerEnvelope next = set.iterator().next();
            StringBuilder sb = new StringBuilder();
            Iterator<BrokerEnvelope> it = set.iterator();
            while (it.hasNext()) {
                sb.append(it.next().getPayload());
            }
            String sb2 = sb.toString();
            try {
                BrokerPayload brokerPayload = (BrokerPayload) OkHttpWebSocketBrokerClient.this.objectMapper.readValue(sb2, BrokerPayload.class);
                OkHttpWebSocketBrokerClient.this.handleMessage(new BrokerMessage().withId(next.getId()).withResponseOf(next.getResponseOf()).withConnectionName(next.getConnectionName()).withSourceConnectionId(next.getSourceConnectionId()).withSourceConnectionType(next.getSourceConnectionType()).withTargetConnectionId(next.getTargetConnectionId()).withTargetConnectionType(next.getTargetConnectionType()).withType(next.getType()).withData(brokerPayload.getData()).withError(brokerPayload.getError()));
            } catch (Throwable th) {
                StdLogger.error(String.format("Unable to deserialize broker message from glued data: %s", sb2), th);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void glue(BrokerEnvelope brokerEnvelope) {
            String id = brokerEnvelope.getId();
            int fragmentCount = brokerEnvelope.getFragmentCount();
            BrokerEnvelopeKey brokerEnvelopeKey = new BrokerEnvelopeKey(id, System.currentTimeMillis());
            Set<BrokerEnvelope> set = this.envelopeMap.get(brokerEnvelopeKey);
            if (set == null) {
                set = new ConcurrentSkipListSet((Comparator<? super BrokerEnvelope>) Comparator.comparingInt((v0) -> {
                    return v0.getFragmentNo();
                }));
                Set<BrokerEnvelope> putIfAbsent = this.envelopeMap.putIfAbsent(brokerEnvelopeKey, set);
                if (putIfAbsent != null) {
                    set = putIfAbsent;
                }
            }
            set.add(brokerEnvelope);
            if (set.size() == fragmentCount) {
                this.envelopeMap.remove(brokerEnvelopeKey);
                glueSiblingEnvelopesAndHandleMessage(set);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void cleanIdleEnvelopes() {
            long currentTimeMillis = System.currentTimeMillis();
            Iterator<BrokerEnvelopeKey> it = this.envelopeMap.keySet().iterator();
            while (it.hasNext()) {
                if (currentTimeMillis - it.next().initTime > this.ENVELOPE_IDLE_TIMEOUT) {
                    it.remove();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/thundra/merloc/broker/client/impl/OkHttpWebSocketBrokerClient$InFlightMessage.class */
    public class InFlightMessage {
        private final CompletableFuture completableFuture;
        private final ScheduledFuture scheduledFuture;

        private InFlightMessage(CompletableFuture completableFuture, ScheduledFuture scheduledFuture) {
            this.completableFuture = completableFuture;
            this.scheduledFuture = scheduledFuture;
        }
    }

    public OkHttpWebSocketBrokerClient(String str, BrokerCredentials brokerCredentials, BrokerMessageCallback brokerMessageCallback) {
        this(str, brokerCredentials, brokerMessageCallback, null, null, null);
    }

    public OkHttpWebSocketBrokerClient(String str, BrokerCredentials brokerCredentials, BrokerMessageCallback brokerMessageCallback, Map<String, String> map, CompletableFuture completableFuture, CompletableFuture completableFuture2) {
        this.objectMapper = new ObjectMapper().configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false).configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        this.messageHandlerExecutorService = ExecutorUtils.newCachedExecutorService("broker-client-message-handler", false);
        this.inFlightMessageCleanerExecutorService = ExecutorUtils.newScheduledExecutorService(1, "broker-client-inflight-cleaner");
        this.idleEnvelopeCleanerExecutorService = ExecutorUtils.newScheduledExecutorService(1, "broker-client-envelope-cleaner");
        this.messageMap = new ConcurrentHashMap();
        this.envelopeGlue = new EnvelopeGlue();
        this.messageCallback = brokerMessageCallback;
        this.connectedFuture = completableFuture == null ? new CompletableFuture() : completableFuture;
        this.closedFuture = completableFuture2 == null ? new CompletableFuture() : completableFuture2;
        Request buildRequest = buildRequest(str, brokerCredentials, map);
        this.client = baseClient.newBuilder().build();
        this.webSocket = this.client.newWebSocket(buildRequest, this);
        this.idleEnvelopeCleanerExecutorService.scheduleAtFixedRate(() -> {
            this.envelopeGlue.cleanIdleEnvelopes();
        }, 1L, 1L, TimeUnit.MINUTES);
    }

    private static String getFullConnectionName(BrokerCredentials brokerCredentials) {
        String connectionName = brokerCredentials.getConnectionName();
        String apiKey = brokerCredentials.getApiKey();
        if (connectionName == null) {
            return null;
        }
        String str = BrokerConstants.CLIENT_CONNECTION_NAME_PREFIX + connectionName;
        if (apiKey != null) {
            str = str + BrokerConstants.CONNECTION_API_KEY_SEPARATOR + apiKey;
        }
        return str;
    }

    private static Request buildRequest(String str, BrokerCredentials brokerCredentials, Map<String, String> map) {
        Request.Builder builder = new Request.Builder();
        builder.url(normalizeBrokerUrl(str));
        String fullConnectionName = getFullConnectionName(brokerCredentials);
        if (fullConnectionName != null) {
            builder.header(API_KEY_HEADER_NAME, fullConnectionName);
        }
        if (map != null) {
            for (Map.Entry<String, String> entry : map.entrySet()) {
                builder.header(entry.getKey(), entry.getValue());
            }
        }
        return builder.build();
    }

    private static String normalizeBrokerUrl(String str) {
        return (str.startsWith("ws://") || str.startsWith("wss://")) ? str : "wss://" + str;
    }

    @Override // io.thundra.merloc.broker.client.BrokerClient
    public boolean isConnected() {
        return this.connectedFuture.isDone() && !this.connectedFuture.isCompletedExceptionally();
    }

    @Override // io.thundra.merloc.broker.client.BrokerClient
    public boolean waitUntilConnected() {
        try {
            return this.connectedFuture.get().booleanValue();
        } catch (InterruptedException | ExecutionException e) {
            return false;
        }
    }

    @Override // io.thundra.merloc.broker.client.BrokerClient
    public boolean waitUntilConnected(long j, TimeUnit timeUnit) {
        try {
            return this.connectedFuture.get(j, timeUnit).booleanValue();
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            return false;
        }
    }

    private void doSend(BrokerMessage brokerMessage, String str) throws IOException {
        int length = str.length();
        if (length < 16384) {
            if (!this.webSocket.send(this.objectMapper.writeValueAsString(new BrokerEnvelope().withId(brokerMessage.getId()).withResponseOf(brokerMessage.getResponseOf()).withConnectionName(brokerMessage.getConnectionName()).withSourceConnectionId(brokerMessage.getSourceConnectionId()).withSourceConnectionType(brokerMessage.getSourceConnectionType()).withTargetConnectionId(brokerMessage.getTargetConnectionId()).withTargetConnectionType(brokerMessage.getTargetConnectionType()).withType(brokerMessage.getType()).withPayload(str)))) {
                throw new IOException("Unable to send message");
            }
            return;
        }
        int i = (length / 16384) + (length % 16384 == 0 ? 0 : 1);
        for (int i2 = 0; i2 < i; i2++) {
            if (!this.webSocket.send(this.objectMapper.writeValueAsString(new BrokerEnvelope().withId(brokerMessage.getId()).withResponseOf(brokerMessage.getResponseOf()).withConnectionName(brokerMessage.getConnectionName()).withSourceConnectionId(brokerMessage.getSourceConnectionId()).withSourceConnectionType(brokerMessage.getSourceConnectionType()).withTargetConnectionId(brokerMessage.getTargetConnectionId()).withTargetConnectionType(brokerMessage.getTargetConnectionType()).withType(brokerMessage.getType()).withPayload(str.substring(i2 * 16384, Math.min((i2 + 1) * 16384, length))).withFragmented(true).withFragmentNo(i2).withFragmentCount(i)))) {
                throw new IOException("Unable to send message");
            }
        }
    }

    @Override // io.thundra.merloc.broker.client.BrokerClient
    public void send(BrokerMessage brokerMessage) throws IOException {
        if (StringUtils.isNullOrEmpty(brokerMessage.getId())) {
            brokerMessage.setId(UUID.randomUUID().toString());
        }
        doSend(brokerMessage, this.objectMapper.writeValueAsString(new BrokerPayload().withData(brokerMessage.getData()).withError(brokerMessage.getError())));
    }

    @Override // io.thundra.merloc.broker.client.BrokerClient
    public BrokerMessage sendAndGetResponse(BrokerMessage brokerMessage, long j, TimeUnit timeUnit) throws IOException {
        if (StringUtils.isNullOrEmpty(brokerMessage.getId())) {
            brokerMessage.setId(UUID.randomUUID().toString());
        }
        CompletableFuture completableFuture = new CompletableFuture();
        ScheduledFuture<?> schedule = this.inFlightMessageCleanerExecutorService.schedule(() -> {
            InFlightMessage remove = this.messageMap.remove(brokerMessage.getId());
            if (remove == null || remove.completableFuture == null) {
                return;
            }
            remove.completableFuture.completeExceptionally(new TimeoutException(String.format("Message with id %s has timed-out", brokerMessage.getId())));
        }, j, timeUnit);
        this.messageMap.put(brokerMessage.getId(), new InFlightMessage(completableFuture, schedule));
        try {
            send(brokerMessage);
        } catch (Throwable th) {
            this.messageMap.remove(brokerMessage.getId());
            schedule.cancel(true);
            ExceptionUtils.sneakyThrow(th);
        }
        try {
            return (BrokerMessage) completableFuture.get(j, timeUnit);
        } catch (Throwable th2) {
            th = th2;
            if (th instanceof ExecutionException) {
                th = th.getCause();
            }
            StdLogger.error("Unable to get response", th);
            return null;
        }
    }

    @Override // io.thundra.merloc.broker.client.BrokerClient
    public void sendCloseMessage(int i, String str) throws IOException {
        if (!this.webSocket.close(i, str)) {
            throw new IOException("Unable to send message");
        }
    }

    @Override // io.thundra.merloc.broker.client.BrokerClient
    public void close() {
        try {
            this.webSocket.close(NORMAL_CLOSE_CODE, null);
        } catch (Exception e) {
        }
    }

    @Override // io.thundra.merloc.broker.client.BrokerClient
    public void destroy() {
        try {
            this.webSocket.cancel();
        } catch (Exception e) {
        }
        this.inFlightMessageCleanerExecutorService.shutdownNow();
        this.idleEnvelopeCleanerExecutorService.shutdownNow();
    }

    @Override // io.thundra.merloc.broker.client.BrokerClient
    public boolean isClosed() {
        return this.closedFuture.isDone() && !this.closedFuture.isCompletedExceptionally();
    }

    @Override // io.thundra.merloc.broker.client.BrokerClient
    public boolean waitUntilClosed() {
        try {
            return this.closedFuture.get().booleanValue();
        } catch (InterruptedException | ExecutionException e) {
            return false;
        }
    }

    @Override // io.thundra.merloc.broker.client.BrokerClient
    public boolean waitUntilClosed(long j, TimeUnit timeUnit) {
        try {
            return this.closedFuture.get(j, timeUnit).booleanValue();
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            return false;
        }
    }

    @Override // io.thundra.merloc.thirdparty.okhttp3.WebSocketListener
    public void onOpen(WebSocket webSocket, Response response) {
        StdLogger.debug("OPEN: " + response.message());
        this.connectedFuture.complete(true);
    }

    @Override // io.thundra.merloc.thirdparty.okhttp3.WebSocketListener
    public void onMessage(WebSocket webSocket, String str) {
        if (StdLogger.DEBUG_ENABLED) {
            StdLogger.debug("MESSAGE: " + str);
        }
        receiveMessage(str);
    }

    @Override // io.thundra.merloc.thirdparty.okhttp3.WebSocketListener
    public void onMessage(WebSocket webSocket, ByteString byteString) {
        String utf8 = byteString.utf8();
        if (StdLogger.DEBUG_ENABLED) {
            StdLogger.debug("MESSAGE: " + utf8);
        }
        receiveMessage(utf8);
    }

    private void receiveMessage(String str) {
        try {
            BrokerEnvelope brokerEnvelope = (BrokerEnvelope) this.objectMapper.readValue(str, BrokerEnvelope.class);
            String payload = brokerEnvelope.getPayload();
            if (StringUtils.isNullOrEmpty(payload)) {
                StdLogger.error("Empty payload in envelope");
                return;
            }
            if (brokerEnvelope.isFragmented()) {
                this.envelopeGlue.glue(brokerEnvelope);
            } else {
                BrokerPayload brokerPayload = (BrokerPayload) this.objectMapper.readValue(payload, BrokerPayload.class);
                if (brokerPayload == null) {
                    StdLogger.error("Empty payload in envelope");
                    return;
                }
                handleMessage(new BrokerMessage().withId(brokerEnvelope.getId()).withResponseOf(brokerEnvelope.getResponseOf()).withConnectionName(brokerEnvelope.getConnectionName()).withSourceConnectionId(brokerEnvelope.getSourceConnectionId()).withSourceConnectionType(brokerEnvelope.getSourceConnectionType()).withTargetConnectionId(brokerEnvelope.getTargetConnectionId()).withTargetConnectionType(brokerEnvelope.getTargetConnectionType()).withType(brokerEnvelope.getType()).withData(brokerPayload.getData()).withError(brokerPayload.getError()));
            }
        } catch (Throwable th) {
            StdLogger.error(String.format("Unable to deserialize broker message: %s", str), th);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleMessage(BrokerMessage brokerMessage) {
        InFlightMessage remove;
        try {
            if (StringUtils.hasValue(brokerMessage.getResponseOf()) && (remove = this.messageMap.remove(brokerMessage.getResponseOf())) != null) {
                if (remove.scheduledFuture != null) {
                    remove.scheduledFuture.cancel(true);
                }
                if (remove.completableFuture != null) {
                    remove.completableFuture.complete(brokerMessage);
                }
            }
            if (this.messageCallback != null) {
                this.messageHandlerExecutorService.submit(() -> {
                    try {
                        this.messageCallback.onMessage(this, brokerMessage);
                    } catch (Throwable th) {
                        StdLogger.error(String.format("Unable to handle broker message: %s", brokerMessage), th);
                    }
                });
            }
        } catch (Throwable th) {
            StdLogger.error(String.format("Unable to handle broker message: %s", brokerMessage), th);
        }
    }

    @Override // io.thundra.merloc.thirdparty.okhttp3.WebSocketListener
    public void onClosing(WebSocket webSocket, int i, String str) {
        StdLogger.debug("CLOSING: " + i + " " + str);
        webSocket.close(NORMAL_CLOSE_CODE, null);
    }

    @Override // io.thundra.merloc.thirdparty.okhttp3.WebSocketListener
    public void onClosed(WebSocket webSocket, int i, String str) {
        StdLogger.debug("CLOSED: " + i + " " + str);
        this.closedFuture.complete(true);
        destroyInFlightMessages(i, str);
    }

    @Override // io.thundra.merloc.thirdparty.okhttp3.WebSocketListener
    public void onFailure(WebSocket webSocket, Throwable th, Response response) {
        StdLogger.error("FAILED: ", th);
        if (response != null) {
            try {
                if (response.body() != null) {
                    StdLogger.error(response.body().string());
                }
            } catch (Exception e) {
            }
        }
        if (!isConnected()) {
            this.connectedFuture.completeExceptionally(th);
        }
        this.closedFuture.completeExceptionally(th);
        destroyInFlightMessages(-1, th.getMessage());
    }

    private void destroyInFlightMessages(int i, String str) {
        Iterator<InFlightMessage> it = this.messageMap.values().iterator();
        while (it.hasNext()) {
            InFlightMessage next = it.next();
            it.remove();
            if (next.scheduledFuture != null) {
                next.scheduledFuture.cancel(true);
            }
            if (next.completableFuture != null) {
                if (i == NORMAL_CLOSE_CODE) {
                    next.completableFuture.complete(null);
                } else {
                    next.completableFuture.completeExceptionally(new IOException(String.format("Connection is closed (code=%d, reason=%s)", Integer.valueOf(i), str)));
                }
            }
        }
    }
}
